Refactors operations into easier build pattern. Pipeline still need some work.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6d54dffc Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6d54dffc Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6d54dffc Branch: refs/heads/USERGRID-608 Commit: 6d54dffc4e9178b85349ec591275c9005ad121ed Parents: 3a1784f Author: Todd Nine <tn...@apigee.com> Authored: Wed May 20 15:00:38 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Wed May 20 19:24:50 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 16 +- .../corepersistence/CpEntityManagerFactory.java | 12 +- .../corepersistence/CpRelationManager.java | 93 +++++--- .../pipeline/FilterPipeline.java | 107 +++++++++ .../pipeline/PipelineModule.java | 2 - .../pipeline/PipelineOperation.java | 2 +- .../pipeline/builder/CandidateBuilder.java | 67 ++++++ .../pipeline/builder/ConnectionBuilder.java | 37 +++ .../pipeline/builder/ConnectionRefBuilder.java | 53 +++++ .../pipeline/builder/EntityBuilder.java | 51 ++++ .../pipeline/builder/IdBuilder.java | 147 ++++++++++++ .../pipeline/builder/PipelineBuilder.java | 100 ++++++++ .../builder/PipelineBuilderFactory.java | 35 +++ .../pipeline/read/AbstractFilter.java | 2 +- .../pipeline/read/AbstractPathFilter.java | 2 +- .../pipeline/read/Collector.java | 38 --- .../pipeline/read/CollectorFactory.java | 38 --- .../corepersistence/pipeline/read/Filter.java | 38 --- .../pipeline/read/FilterFactory.java | 69 ++++-- .../pipeline/read/FilterPipeline.java | 132 ----------- .../pipeline/read/ReadFilterFactoryImpl.java | 136 ----------- .../pipeline/read/ResultsPage.java | 10 +- .../read/collect/AbstractCollector.java | 46 ---- .../read/collect/ConnectionRefFilter.java | 68 ++++++ .../read/collect/ConnectionRefResumeFilter.java | 86 +++++++ .../read/collect/EntityResumeFilter.java | 3 +- .../read/collect/ResultsPageCollector.java | 35 ++- .../AbstractElasticSearchFilter.java | 171 -------------- .../pipeline/read/elasticsearch/Candidate.java | 55 ----- .../elasticsearch/CandidateEntityFilter.java | 234 ------------------- .../read/elasticsearch/CandidateIdFilter.java | 191 --------------- .../ElasticSearchCollectionFilter.java | 77 ------ .../ElasticSearchConnectionFilter.java | 73 ------ .../ElasticsearchCursorSerializer.java | 42 ---- .../read/elasticsearch/Elasticsearchdiagram.jpg | Bin 316655 -> 0 bytes .../graph/AbstractReadGraphEdgeByIdFilter.java | 82 ------- .../read/graph/AbstractReadGraphFilter.java | 147 ------------ .../read/graph/EdgeCursorSerializer.java | 42 ---- .../pipeline/read/graph/EntityIdFilter.java | 54 ----- .../pipeline/read/graph/EntityLoadFilter.java | 155 ------------ .../pipeline/read/graph/GraphDiagram.jpg | Bin 347711 -> 0 bytes .../graph/ReadGraphCollectionByIdFilter.java | 49 ---- .../read/graph/ReadGraphCollectionFilter.java | 53 ----- .../graph/ReadGraphConnectionByIdFilter.java | 50 ---- .../graph/ReadGraphConnectionByTypeFilter.java | 100 -------- .../read/graph/ReadGraphConnectionFilter.java | 53 ----- .../search/AbstractElasticSearchFilter.java | 169 ++++++++++++++ .../pipeline/read/search/Candidate.java | 55 +++++ .../read/search/CandidateEntityFilter.java | 232 ++++++++++++++++++ .../pipeline/read/search/CandidateIdFilter.java | 190 +++++++++++++++ .../search/ElasticsearchCursorSerializer.java | 40 ++++ .../read/search/Elasticsearchdiagram.jpg | Bin 0 -> 316655 bytes .../read/search/SearchCollectionFilter.java | 77 ++++++ .../read/search/SearchConnectionFilter.java | 72 ++++++ .../AbstractReadGraphEdgeByIdFilter.java | 82 +++++++ .../read/traverse/AbstractReadGraphFilter.java | 146 ++++++++++++ .../read/traverse/EdgeCursorSerializer.java | 42 ++++ .../pipeline/read/traverse/EntityIdFilter.java | 53 +++++ .../read/traverse/EntityLoadVerifyFilter.java | 154 ++++++++++++ .../pipeline/read/traverse/GraphDiagram.jpg | Bin 0 -> 347711 bytes .../traverse/ReadGraphCollectionByIdFilter.java | 49 ++++ .../traverse/ReadGraphCollectionFilter.java | 53 +++++ .../traverse/ReadGraphConnectionByIdFilter.java | 50 ++++ .../ReadGraphConnectionByTypeFilter.java | 99 ++++++++ .../traverse/ReadGraphConnectionFilter.java | 53 +++++ .../results/ObservableQueryExecutor.java | 2 +- .../pipeline/cursor/CursorTest.java | 4 +- 67 files changed, 2515 insertions(+), 2160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 7a56631..be52547 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -38,8 +38,7 @@ import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory; -import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; +import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.AggregateCounter; @@ -180,8 +179,7 @@ public class CpEntityManager implements EntityManager { private final AsyncEventService indexService; - private final FilterFactory filterFactory; - private final CollectorFactory collectorFactory; + private final PipelineBuilderFactory filterFactory; private boolean skipAggregateCounters; private MetricsFactory metricsFactory; @@ -223,7 +221,7 @@ public class CpEntityManager implements EntityManager { */ public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, final EntityManagerFig entityManagerFig, - final FilterFactory filterFactory, final CollectorFactory collectorFactory, final UUID applicationId ) { + final PipelineBuilderFactory pipelineBuilderFactory, final UUID applicationId ) { this.entityManagerFig = entityManagerFig; @@ -232,10 +230,8 @@ public class CpEntityManager implements EntityManager { Preconditions.checkNotNull( managerCache, "managerCache must not be null" ); Preconditions.checkNotNull( applicationId, "applicationId must not be null" ); Preconditions.checkNotNull( indexService, "indexService must not be null" ); - Preconditions.checkNotNull( filterFactory, "filterFactory must not be null" ); - Preconditions.checkNotNull( collectorFactory, "collectorFactory must not be null" ); - this.filterFactory = filterFactory; - this.collectorFactory = collectorFactory; + Preconditions.checkNotNull( pipelineBuilderFactory, "filterFactory must not be null" ); + this.filterFactory = pipelineBuilderFactory; this.managerCache = managerCache; @@ -750,7 +746,7 @@ public class CpEntityManager implements EntityManager { Preconditions.checkNotNull( entityRef, "entityRef cannot be null" ); CpRelationManager relationManager = - new CpRelationManager( metricsFactory, managerCache, filterFactory, collectorFactory, indexService, this, entityManagerFig, applicationId, entityRef ); + new CpRelationManager( managerCache, filterFactory, indexService, this, entityManagerFig, applicationId, entityRef ); return relationManager; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 5055538..baa1148 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -35,8 +35,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.index.ReIndexService; -import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory; -import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; +import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.exception.ConflictException; import org.apache.usergrid.persistence.AbstractEntity; @@ -126,8 +125,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private final EntityIndex entityIndex; private final MetricsFactory metricsFactory; private final AsyncEventService indexService; - private final FilterFactory filterFactory; - private final CollectorFactory collectorFactory; + private final PipelineBuilderFactory pipelineBuilderFactory; public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils, final Injector injector ) { @@ -141,8 +139,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.managerCache = injector.getInstance( ManagerCache.class ); this.metricsFactory = injector.getInstance( MetricsFactory.class ); this.indexService = injector.getInstance( AsyncEventService.class ); - this.filterFactory = injector.getInstance( FilterFactory.class ); - this.collectorFactory = injector.getInstance( CollectorFactory.class ); + this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class ); this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance( getManagementEntityManager() ); @@ -203,7 +200,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private EntityManager _getEntityManager( UUID applicationId ) { EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig, - filterFactory, collectorFactory, applicationId ); + + pipelineBuilderFactory, applicationId ); return em; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 6201fe8..1c34929 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -31,9 +31,11 @@ import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory; -import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; -import org.apache.usergrid.corepersistence.pipeline.read.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder; +import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder; +import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; @@ -52,7 +54,6 @@ import org.apache.usergrid.persistence.Schema; import org.apache.usergrid.persistence.SimpleEntityRef; import org.apache.usergrid.persistence.SimpleRoleRef; import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.entities.Group; import org.apache.usergrid.persistence.entities.User; @@ -121,13 +122,12 @@ public class CpRelationManager implements RelationManager { private final AsyncEventService indexService; - private final FilterFactory filterFactory; - private final CollectorFactory collectorFactory; + private final PipelineBuilderFactory pipelineBuilderFactory; - public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, - final FilterFactory filterFactory, final CollectorFactory collectorFactory, final AsyncEventService indexService, + public CpRelationManager( final ManagerCache managerCache, + final PipelineBuilderFactory pipelineBuilderFactory, final AsyncEventService indexService, final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId, final EntityRef headEntity ) { @@ -147,8 +147,7 @@ public class CpRelationManager implements RelationManager { this.managerCache = managerCache; this.applicationScope = CpNamingUtils.getApplicationScope( applicationId ); - this.filterFactory = filterFactory; - this.collectorFactory = collectorFactory; + this.pipelineBuilderFactory = pipelineBuilderFactory; if ( logger.isDebugEnabled() ) { logger.debug( "Loading head entity {}:{} from app {}", new Object[] { @@ -629,29 +628,23 @@ public class CpRelationManager implements RelationManager { query = adjustQuery( query ); - final FilterPipeline<Id> filterPipeline = new FilterPipeline( applicationScope, query.getCursor(), query.getLimit() ).withFilter( filterFactory.getEntityIdFilter( cpHeadEntity.getId() ) ); + final IdBuilder pipelineBuilder = + pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() ) + .withLimit( query.getLimit() ).fromId( cpHeadEntity.getId() ); - final FilterPipeline<org.apache.usergrid.persistence.model.entity.Entity> entityFilterPipeline; + final EntityBuilder results; if ( query.isGraphSearch() ) { - entityFilterPipeline = filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) ) - .withFilter( filterFactory.entityLoadFilter() ); + results = pipelineBuilder.traverseCollection( collectionName ).loadEntities(); } else { final String entityType = collection.getType(); - - entityFilterPipeline = filterPipeline.withFilter( - filterFactory.elasticSearchCollectionFilter( query.getQl().get(), collectionName, entityType ) ) - .withFilter( filterFactory.candidateEntityFilter() ); + results = pipelineBuilder.searchCollection( collectionName, entityType, query.getQl().get() ).loadEntities(); } - final Observable<ResultsPage> resultsObservable = - entityFilterPipeline.withFilter( filterFactory.entityResumeFilter() ) - .withCollector( collectorFactory.getResultsPageCollector() ).execute(); - - return new ObservableQueryExecutor( resultsObservable ).next(); + return new ObservableQueryExecutor( results.build() ).next(); } @@ -923,7 +916,7 @@ public class CpRelationManager implements RelationManager { query = adjustQuery( query ); - final String entityType = query.getEntityType(); + final Optional<String> entityType = Optional.fromNullable( query.getEntityType() ) ; //set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector // -> 1.0 results @@ -935,31 +928,57 @@ public class CpRelationManager implements RelationManager { // collector - final FilterPipeline<Id> filterPipeline = - new FilterPipeline( applicationScope, query.getCursor(), query.getLimit() ) - .withFilter( filterFactory.getEntityIdFilter( cpHeadEntity.getId() ) ); + final IdBuilder + pipelineBuilder = pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() ).withLimit( query.getLimit() ).fromId( + cpHeadEntity.getId() ); + + + + + if(query.getResultsLevel() == Level.REFS){ + final Observable<ResultsPage<ConnectionRef>> results; + + if(query.isGraphSearch()){ + + results = pipelineBuilder.traverseConnection( connection, entityType ).loadConnectionRefs( cpHeadEntity.getId(), connection ).build(); + + + } + else + { + results = pipelineBuilder.searchConnection( connection, query.getQl().get(),entityType) .loadIds().loadConnectionRefs( cpHeadEntity.getId(), connection ).build(); + + } + + throw new UnsupportedOperationException( "Implement me" ); + + } + + + + if(query.getResultsLevel() == Level.IDS){ + + throw new UnsupportedOperationException( "Not yet implemented" ); + } + + + //we want to load all entities + final Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> results; - final FilterPipeline<org.apache.usergrid.persistence.model.entity.Entity> entityFilterPipeline; if ( query.isGraphSearch() ) { - entityFilterPipeline = filterPipeline.withFilter( filterFactory.readGraphConnectionFilter( connection ) ) - .withFilter( filterFactory.entityLoadFilter() ); + results = pipelineBuilder.traverseConnection( connection, entityType ).loadEntities().build(); } else { - entityFilterPipeline = filterPipeline.withFilter( filterFactory - .elasticSearchConnectionFilter( query.getQl().get(), connection, Optional.fromNullable( entityType ) ) ) - .withFilter( filterFactory.candidateEntityFilter() ); + results = pipelineBuilder.searchConnection( connection, query.getQl().get() , entityType).loadEntities().build(); } - final Observable<ResultsPage> resultsObservable = - entityFilterPipeline.withFilter( filterFactory.entityResumeFilter() ) - .withCollector( collectorFactory.getResultsPageCollector() ).execute(); - return new ObservableQueryExecutor( resultsObservable ).next(); + return new ObservableQueryExecutor( results ).next(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java new file mode 100644 index 0000000..089f47d --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.util.ValidationUtils; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +import rx.Observable; + + +/** + * Pipeline for applying our UG domain specific filters. + * + * Modeled after an observable, with typing to allow input of specific filters + * + * @param InputType the input type in the current pipeline state + */ +public class FilterPipeline<InputType> { + + + private int idCount = 0; + + private final ApplicationScope applicationScope; + + + private final RequestCursor requestCursor; + private int limit; + + //Generics hell, intentionally without a generic, we check at the filter level + private Observable currentObservable; + + + /** + * Create our filter pipeline + */ + public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) { + + + ValidationUtils.validateApplicationScope( applicationScope ); + Preconditions.checkNotNull( cursor, "cursor optional is required" ); + Preconditions.checkArgument( limit > 0, "limit must be > 0" ); + + + this.applicationScope = applicationScope; + + //init our cursor to empty + this.requestCursor = new RequestCursor( cursor ); + + //set the default limit + this.limit = limit; + + //set our observable to start at the application + final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() ); + this.currentObservable = Observable.just( filter ); + } + + + public <OutputType> FilterPipeline<OutputType> withFilter( + final PipelineOperation<? super InputType, ? extends OutputType> filter ) { + + + + final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount ); + + filter.setContext( context ); + + //done for clarity + idCount++; + + return ( FilterPipeline<OutputType> ) this; + } + + + + /** + * Return the observable of the filter pipeline + */ + public Observable<InputType> execute() { + return currentObservable; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java index ef696bd..8ec8704 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java @@ -20,7 +20,6 @@ package org.apache.usergrid.corepersistence.pipeline; -import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory; import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; import com.google.inject.AbstractModule; @@ -44,6 +43,5 @@ public class PipelineModule extends AbstractModule { //Use Guice to create the builder since we don't really need to do anything //other than DI when creating the filters install( new FactoryModuleBuilder().build( FilterFactory.class ) ); - install( new FactoryModuleBuilder().build( CollectorFactory.class )); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java index d2fa16c..3dda22e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperation.java @@ -33,7 +33,7 @@ import rx.Observable; * @param <T> The input type of the filter value * @param <R> The output type of the filter value */ -public interface PipelineOperation<T, R> extends Observable.Transformer<FilterResult<T>, R> { +public interface PipelineOperation<T, R> extends Observable.Transformer<T, R> { void setContext(final PipelineContext pipelineContext); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java new file mode 100644 index 0000000..5cb2eab --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.builder; + + +import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; +import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + + +public class CandidateBuilder { + + + private final FilterPipeline<FilterResult<Candidate>> filterPipeline; + private final FilterFactory filterFactory; + + + public CandidateBuilder( final FilterPipeline<FilterResult<Candidate>> filterPipeline, + final FilterFactory filterFactory ) { + this.filterPipeline = filterPipeline; + this.filterFactory = filterFactory; + } + + + /** + * Validates all candidates for the versions by id and sets them + * @return + */ + public IdBuilder loadIds(){ + + final FilterPipeline<FilterResult<Id>> newFilter = filterPipeline.withFilter( filterFactory.candidateResultsIdVerifyFilter() ); + + return new IdBuilder( newFilter, filterFactory ); + } + + + /** + * Load all the candidates as entities and return them + * @return + */ + public EntityBuilder loadEntities(){ + + final FilterPipeline<FilterResult<Entity>> newFilter = filterPipeline.withFilter( filterFactory.candidateEntityFilter() ); + + return new EntityBuilder(newFilter ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java new file mode 100644 index 0000000..b4ea94e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.builder; + + +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.persistence.ConnectionRef; + +import rx.Observable; + + +public class ConnectionBuilder { + + + + public Observable<ResultsPage<ConnectionRef>> build(){ + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java new file mode 100644 index 0000000..6c0ebc8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.builder; + + +import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector; +import org.apache.usergrid.persistence.ConnectionRef; + +import rx.Observable; + + +/** + * A 1.0 compatibility state. Should be removed as services are refactored + */ +@Deprecated +public class ConnectionRefBuilder { + + + private final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter; + + public ConnectionRefBuilder( final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter ) { + this.connectionRefFilter = connectionRefFilter; + } + + + /** + * Build our connection refs observable + * @return + */ + public Observable<ResultsPage<ConnectionRef>> build(){ + return connectionRefFilter.withFilter( new ResultsPageCollector<>() ).execute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java new file mode 100644 index 0000000..07b4586 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.builder; + + +import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.persistence.model.entity.Entity; + +import rx.Observable; + + +/** + * Builder to build our entity state + */ +public class EntityBuilder { + + private final FilterPipeline<FilterResult<Entity>> filterPipeline; + + + public EntityBuilder( final FilterPipeline<FilterResult<Entity>> filterPipeline ) { + this.filterPipeline = filterPipeline; + } + + + /** + * Build our results of entities + * @return + */ + public Observable<ResultsPage<Entity>> build(){ + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java new file mode 100644 index 0000000..12a89ba --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.builder; + + +import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; +import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; +import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter; +import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter; +import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter; +import org.apache.usergrid.persistence.ConnectionRef; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + + +/** + * A builder to transition from emitting Ids in the pipeline into other operations + */ +public class IdBuilder { + + + private final FilterFactory filterFactory; + private final FilterPipeline<FilterResult<Id>> filterPipeline; + + + public IdBuilder( final FilterPipeline<FilterResult<Id>> filterPipeline, final FilterFactory filterFactory ) { + this.filterPipeline = filterPipeline; + this.filterFactory = filterFactory; + } + + + /** + * Load all the ids we encounter when traversing the graph as entities + * @return + */ + public EntityBuilder loadEntities() { + final FilterPipeline<FilterResult<Entity>> pipeline = + filterPipeline.withFilter( filterFactory.entityLoadFilter() ); + + return new EntityBuilder( pipeline ); + } + + + /** + * Traverse all the collection edges from our input Id + * @param collectionName + * @return + */ + public IdBuilder traverseCollection( final String collectionName ) { + final FilterPipeline<FilterResult<Id>> newFilter = + filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) ); + + return new IdBuilder( newFilter, filterFactory ); + } + + + /** + * Traverse all connection edges from our input Id + * @param connectionName The name of the connection + * @param entityType The optional type of the entity + * @return + */ + public IdBuilder traverseConnection( final String connectionName, final Optional<String> entityType ) { + + final PipelineOperation<FilterResult<Id>, FilterResult<Id>> filter; + + if(entityType.isPresent()){ + filter = filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType.get() ); + }else{ + filter = filterFactory.readGraphConnectionFilter( connectionName ); + } + + + return new IdBuilder( filterPipeline.withFilter(filter ), filterFactory ); + } + + + /** + * Search all collections from our inputId with the specified criteria + * @param collectionName The name of the collection + * @param ql The user's query to execute + * @param entityType The type of the entity + * @return Candidate results + */ + public CandidateBuilder searchCollection( final String collectionName, final String ql, final String entityType ) { + + final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchCollectionFilter( + ql, collectionName, entityType ) ); + + return new CandidateBuilder( newFilter, filterFactory ); + } + + + /** + * Search all connections from our input Id and search their connections + * @param connectionName The connection name to search + * @param ql The query to execute + * @param entityType The optional type of entity. If this is absent, all entity types in the connection will be searched + * @return Candidate results + */ + public CandidateBuilder searchConnection( final String connectionName, final String ql , final Optional<String> entityType) { + + + final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchConnectionFilter( + ql, connectionName, entityType ) ); + + return new CandidateBuilder( newFilter, filterFactory ); + } + + + /** + * Create connection refs from our ids. This is a legacy operation + * @param sourceId + * @param connectionType + * @return + */ + @Deprecated + public ConnectionRefBuilder loadConnectionRefs(final Id sourceId, final String connectionType){ + + final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter = filterPipeline.withFilter( new ConnectionRefFilter(sourceId, connectionType ) ).withFilter( + new ConnectionRefResumeFilter() ); + return new ConnectionRefBuilder(connectionRefFilter); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java new file mode 100644 index 0000000..488e9c1 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.builder; + + +import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; +import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + + +/** + * This is our root builder to build filter pipelines. All operations should start with an instance of this class, and compose + * graph operations by traversing various builders to create our filter pipeline + */ +public class PipelineBuilder { + + + + private final ApplicationScope applicationScope; + private Optional<String> cursor = Optional.absent(); + private int limit = 10; + private final FilterFactory filterFactory; + + + /** + * Create an instance of our I/O operations + * @param filterFactory + */ + @Inject + public PipelineBuilder( final FilterFactory filterFactory, @Assisted final ApplicationScope applicationScope ) { + this.filterFactory = filterFactory; + this.applicationScope = applicationScope; + } + + + + + /** + * Set the cursor to use in our filter pipline + * @param cursor + * @return + */ + public PipelineBuilder withCursor(final Optional<String> cursor){ + Preconditions.checkNotNull(cursor, "cursor must not be null"); + this.cursor = cursor; + return this; + } + + + /** + * Set our limit + * @param limit + * @return + */ + public PipelineBuilder withLimit(final int limit){ + this.limit = limit; + return this; + } + + + /** + * Set our start point in our graph traversal to the specified entity id. A 1.0 compatibility API. eventually this should be replaced with + * a call that will allow us to start traversing at the application node to any other node in the graph + * + * @param entityId + * @return + */ + @Deprecated + public IdBuilder fromId(final Id entityId){ + FilterPipeline<FilterResult<Id>> filterPipeline = new FilterPipeline( applicationScope, this.cursor,limit ).withFilter( filterFactory.getEntityIdFilter( entityId ) ); + + return new IdBuilder( filterPipeline, filterFactory ); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java new file mode 100644 index 0000000..6cb515b --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.builder; + + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + + +public interface PipelineBuilderFactory { + + + /** + * Create our pipeline builder to allow us to build our pipeline + * @param applicationScope + * @return + */ + PipelineBuilder create( final ApplicationScope applicationScope ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java index e4d5d44..64cf67f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java @@ -29,7 +29,7 @@ import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; * @param <T> the input type * @param <R> The output Type */ -public abstract class AbstractFilter<T, R> implements Filter<T, R> { +public abstract class AbstractFilter<T, R> implements PipelineOperation<T, R> { protected PipelineContext pipelineContext; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java index c68dc4a..6dc4561 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java @@ -33,7 +33,7 @@ import com.google.common.base.Optional; * @param <R> The response type * @param <C> The cursor type */ -public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<T, R> implements Filter<T, R> { +public abstract class AbstractPathFilter<T, R, C extends Serializable> extends AbstractFilter<FilterResult<T>, FilterResult<R>> { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java deleted file mode 100644 index e28ce44..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; - - -/** - * A command that is used to reduce our stream of results into a stream of final batch outputs. When used - * no further transformation or encoding should occur. Otherwise EdgePath data will be lost, and serialization cannot occur - * across requests - * - * @param <T> The input type - * @param <R> The output type - */ -public interface Collector<T, R> extends PipelineOperation<T,R> { - - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java deleted file mode 100644 index dd200b5..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector; - - -/** - * A factory for generating collectors - */ -public interface CollectorFactory { - - - /** - * Get the results page collector - * @return - */ - ResultsPageCollector getResultsPageCollector(); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java deleted file mode 100644 index ee01602..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; - - -/** - * Traverses edges in the graph. Either by query or graph traversal. Take an observable of FilterResult, and emits - * an observable of FilterResults. Filters should never emit groups or objects that represent collections. Items should - * always be emitted 1 at a time. It is the responsibility of the collector to aggregate results. - */ -public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> { - - /** - * Get the builder for the next phase - * @return - */ -// B getNextBuilder(); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java index d297c2a..ca5695c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java @@ -20,18 +20,20 @@ package org.apache.usergrid.corepersistence.pipeline.read; +import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter; +import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter; import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityResumeFilter; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchConnectionFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter; +import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateEntityFilter; +import org.apache.usergrid.corepersistence.pipeline.read.search.CandidateIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.search.SearchCollectionFilter; +import org.apache.usergrid.corepersistence.pipeline.read.search.SearchConnectionFilter; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.EntityLoadVerifyFilter; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionByIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphCollectionFilter; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionFilter; import org.apache.usergrid.persistence.model.entity.Id; import com.google.common.base.Optional; @@ -90,10 +92,9 @@ public interface FilterFactory { * @param query The query to use when querying the entities in the collection * @param collectionName The collection name to use when querying */ - ElasticSearchCollectionFilter elasticSearchCollectionFilter( @Assisted( "query" ) final String query, - @Assisted( "collectionName" ) - final String collectionName, - @Assisted( "entityType" ) final String entityType ); + SearchCollectionFilter searchCollectionFilter( @Assisted( "query" ) final String query, + @Assisted( "collectionName" ) final String collectionName, + @Assisted( "entityType" ) final String entityType ); /** @@ -103,17 +104,16 @@ public interface FilterFactory { * @param connectionName The type of connection to query * @param connectedEntityType The type of entity in the connection. Leave absent to query all entity types */ - ElasticSearchConnectionFilter elasticSearchConnectionFilter( @Assisted( "query" ) final String query, - @Assisted( "connectionName" ) - final String connectionName, - @Assisted( "connectedEntityType" ) - final Optional<String> connectedEntityType ); + SearchConnectionFilter searchConnectionFilter( @Assisted( "query" ) final String query, + @Assisted( "connectionName" ) final String connectionName, + @Assisted( "connectedEntityType" ) + final Optional<String> connectedEntityType ); /** * Generate a new instance of the command with the specified parameters */ - EntityLoadFilter entityLoadFilter(); + EntityLoadVerifyFilter entityLoadFilter(); /** * Get the collector for collection candidate results to entities @@ -127,16 +127,37 @@ public interface FilterFactory { CandidateIdFilter candidateResultsIdVerifyFilter(); /** - * Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals - * * @param entityId The entity id to emit + * + * @deprecated A 1.0 api + * + * Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals */ + @Deprecated EntityIdFilter getEntityIdFilter( final Id entityId ); /** * Create a new instance of our entity filter - * @return */ EntityResumeFilter entityResumeFilter(); + + /** + * @deprecated A 1.0 api Create a filter for resuming connection references + */ + @Deprecated + ConnectionRefResumeFilter connectionRefResumeFilter(); + + /** + * + * Creates connection refs for 1.0 compatibilty + * + * @param sourceId The source id + * @param connectionType The connection type + * + * @deprecated A 1.0 api Create a filter for transforming incoming ids into connection refs + */ + @Deprecated + ConnectionRefFilter connectionRefFilter( @Assisted( "sourceId" ) final Id sourceId, + @Assisted( "connectionType" ) final String connectionType ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java deleted file mode 100644 index f8bbdd8..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.corepersistence.pipeline.PipelineContext; -import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; -import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.util.ValidationUtils; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; - -import rx.Observable; - - -/** - * Pipeline for applying our UG domain specific filters. - * - * Modeled after an observable, with typing to allow input of specific filters - * - * @param InputType the input type in the current pipeline state - */ -public class FilterPipeline<InputType> { - - - private int idCount = 0; - - private final ApplicationScope applicationScope; - - - private final RequestCursor requestCursor; - private int limit; - - //Generics hell, intentionally without a generic, we check at the filter level - private Observable currentObservable; - - - /** - * Create our filter pipeline - */ - public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) { - - - ValidationUtils.validateApplicationScope( applicationScope ); - Preconditions.checkNotNull( cursor, "cursor optional is required" ); - Preconditions.checkArgument( limit > 0, "limit must be > 0" ); - - - this.applicationScope = applicationScope; - - //init our cursor to empty - this.requestCursor = new RequestCursor( cursor ); - - //set the default limit - this.limit = limit; - - //set our observable to start at the application - final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() ); - this.currentObservable = Observable.just( filter ); - } - - - public <OutputType> FilterPipeline<OutputType> withFilter( - final Filter<? super InputType, ? extends OutputType> filter ) { - - - setUp( filter ); - - return ( FilterPipeline<OutputType> ) this; - } - - - public <OutputType> FilterPipeline<OutputType> withCollector( - final Collector<? super InputType, ? extends OutputType> collector ) { - - - setUp( collector ); - - return ( FilterPipeline<OutputType> ) this; - } - - - private <OutputType> void setUp( - final PipelineOperation<? super InputType, ? extends OutputType> pipelineOperation ) { - setState( pipelineOperation ); - - currentObservable = currentObservable.compose( pipelineOperation ); - } - - - /** - * Return the observable of the filter pipeline - */ - public Observable<InputType> execute() { - return currentObservable; - } - - - /** - * Set the id of the state - */ - private void setState( final PipelineOperation pipelineOperation ) { - - - final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount ); - - pipelineOperation.setContext( context ); - - //done for clarity - idCount++; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java deleted file mode 100644 index 0f73fb9..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import com.google.inject.Singleton; - - -@Singleton -public class ReadFilterFactoryImpl { //implements ReadFilterFactory { - -// -// private final GraphManagerFactory graphManagerFactory; -// private final EntityIndexFactory entityIndexFactory; -// private final EntityCollectionManagerFactory entityCollectionManagerFactory; -// -// -// @Inject -// public ReadFilterFactoryImpl( final GraphManagerFactory graphManagerFactory, -// final EntityIndexFactory entityIndexFactory, -// final EntityCollectionManagerFactory entityCollectionManagerFactory ) { -// -// -// this.graphManagerFactory = graphManagerFactory; -// this.entityIndexFactory = entityIndexFactory; -// this.entityCollectionManagerFactory = entityCollectionManagerFactory; -// } -// -// -// @Override -// public ReadGraphCollectionFilter readGraphCollectionCommand( final String collectionName ) { -// return new ReadGraphCollectionFilter( graphManagerFactory, collectionName ); -// } -// -// -// @Override -// public ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( final String collectionName, -// final Id targetId ) { -// return new ReadGraphCollectionByIdFilter( graphManagerFactory, collectionName, targetId ); -// } -// -// -// @Override -// public ReadGraphConnectionFilter readGraphConnectionCommand( final String connectionName ) { -// return new ReadGraphConnectionFilter( graphManagerFactory, connectionName ); -// } -// -// -// @Override -// public ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String connectionName, -// final String entityType ) { -// return new ReadGraphConnectionByTypeFilter( graphManagerFactory, connectionName, entityType ); -// } -// -// -// @Override -// public ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, -// final Id targetId ) { -// return new ReadGraphConnectionByIdFilter( graphManagerFactory, connectionName, targetId ); -// } -// -// -// @Override -// public EntityLoadCollector entityLoadCollector() { -// return new EntityLoadCollector( entityCollectionManagerFactory ); -// } -// -// -// /** -// * TODO refactor these impls to use RX internally, as well as remove the query object -// */ -// @Override -// public QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector( -// final String collectionName, final String query ) { -// -// final Query queryObject = Query.fromQL( query ); -// -// final QueryCollectionElasticSearchCollectorFilter filter = -// new QueryCollectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, -// collectionName, queryObject ); -// -// return filter; -// } -// -// -// @Override -// public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( -// final String connectionName, final String query ) { -// -// final Query queryObject = Query.fromQL( query ); -// -// final QueryConnectionElasticSearchCollectorFilter filter = -// new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, -// connectionName, queryObject ); -// -// return filter; -// } -// -// -// @Override -// public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( -// final String connectionName, final String connectionEntityType, final String query ) { -// -// final Query queryObject = Query.fromQL( query ); -// queryObject.setConnectionType( connectionEntityType ); -// -// final QueryConnectionElasticSearchCollectorFilter filter = -// new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, -// connectionName, queryObject ); -// -// return filter; -// } -// -// -// @Override -// public EntityIdFilter getEntityIdFilter( final Id entityId ) { -// return new EntityIdFilter( entityId ); -// } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java index 1810d65..6b3a086 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java @@ -29,24 +29,26 @@ import org.apache.usergrid.persistence.model.entity.Entity; /** * An encapsulation of entities as a group of responses. Ordered by the requesting filters. Each set should be * considered a "page" of results. A hold over from 1.0. We shouldn't need this when we fully move away from the EM/RM + * + * @param T the type of results page */ -public class ResultsPage { +public class ResultsPage<T> { - private final List<Entity> entityList; + private final List<T> entityList; private final int limit; private final ResponseCursor responseCursor; - public ResultsPage( final List<Entity> entityList, final ResponseCursor responseCursor, final int limit ) { + public ResultsPage( final List<T> entityList, final ResponseCursor responseCursor, final int limit ) { this.entityList = entityList; this.responseCursor = responseCursor; this.limit = limit; } - public List<Entity> getEntityList() { + public List<T> getEntityList() { return entityList; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java deleted file mode 100644 index 1c5175d..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/AbstractCollector.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.collect; - - -import org.apache.usergrid.corepersistence.pipeline.PipelineContext; -import org.apache.usergrid.corepersistence.pipeline.read.Collector; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; - - -/** - * Basic functionality for our commands to handle cursor IO - * @param <T> the input type - * @param <R> The output Type - */ -public abstract class AbstractCollector<T, R> implements Collector<T, R> { - - - protected PipelineContext pipelineContext; - - - @Override - public void setContext( final PipelineContext pipelineContext ) { - this.pipelineContext = pipelineContext; - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java new file mode 100644 index 0000000..392e33a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefFilter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.collect; + + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; +import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.ConnectionRef; +import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import rx.Observable; + + +/** + * This class only exists for 1.0 compatibility, remove once services no longer need connection refs + */ +public class ConnectionRefFilter extends AbstractFilter<FilterResult<Id>, FilterResult<ConnectionRef>> { + + + private final Id sourceId; + private final String connectionType; + + + @Inject + public ConnectionRefFilter( @Assisted( "sourceId" ) final Id sourceId, + @Assisted( "connectionType" ) final String connectionType ) { + this.sourceId = sourceId; + this.connectionType = connectionType; + } + + + @Override + public Observable<FilterResult<ConnectionRef>> call( final Observable<FilterResult<Id>> filterResultObservable ) { + + return filterResultObservable.map( targetResult -> { + + final Id targetId = targetResult.getValue(); + final ConnectionRef ref = + new ConnectionRefImpl( sourceId.getType(), sourceId.getUuid(), connectionType, targetId.getType(), + targetId.getUuid() ); + + return new FilterResult<>( ref, Optional.<EdgePath>absent() ); + } ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java new file mode 100644 index 0000000..5c3a93e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ConnectionRefResumeFilter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.collect; + + +import java.util.UUID; + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.ConnectedEntityRef; +import org.apache.usergrid.persistence.ConnectionRef; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if + * it matches the Id that was set. This is a 1.0 compatibility implementation, and should be removed when services no + * longer depends on connection refs + */ +public class ConnectionRefResumeFilter extends AbstractPathFilter<ConnectionRef, ConnectionRef, Id> { + + + @Override + public Observable<FilterResult<ConnectionRef>> call( + final Observable<FilterResult<ConnectionRef>> filterResultObservable ) { + + //filter only the first id, then map into our path for our next pass + + + return filterResultObservable.skipWhile( filterResult -> { + + final Optional<Id> startFromCursor = getSeekValue(); + + + if ( !startFromCursor.isPresent() ) { + return false; + } + + final ConnectedEntityRef ref = filterResult.getValue().getTargetRefs(); + + final Id entityId = startFromCursor.get(); + + return entityId.getUuid().equals( ref.getUuid() ) && entityId.getType().equals( ref.getType() ); + } ).map( filterResult -> { + + + final ConnectionRef entity = filterResult.getValue(); + + final String type = entity.getTargetRefs().getType(); + final UUID uuid = entity.getTargetRefs().getUuid(); + + final Id entityId = new SimpleId( uuid, type ); + + return createFilterResult( entity, entityId, filterResult.getPath() ); + } ); + } + + + @Override + protected CursorSerializer<Id> getCursorSerializer() { + return IdCursorSerializer.INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java index 2917b61..f545631 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java @@ -22,7 +22,6 @@ package org.apache.usergrid.corepersistence.pipeline.read.collect; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -36,7 +35,7 @@ import rx.Observable; * A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if * it matches the Id that was set */ -public class EntityResumeFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> { +public class EntityResumeFilter extends AbstractPathFilter<Entity, Entity, Id> { @Override