Refactor into pipline and filter pattern for higher level operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b5e60e04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b5e60e04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b5e60e04 Branch: refs/heads/two-dot-o-dev Commit: b5e60e04fe891ff271448b611aebaa6455669f5b Parents: 7f2a4bb Author: Todd Nine <[email protected]> Authored: Tue Apr 28 14:29:25 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Wed Apr 29 11:48:41 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpRelationManager.java | 17 +- .../corepersistence/index/IndexServiceImpl.java | 45 +++- .../corepersistence/pipeline/DataPipeline.java | 136 ----------- .../corepersistence/pipeline/Pipeline.java | 123 ++++++++++ .../pipeline/PipelineContext.java | 94 ++++++++ .../pipeline/PipelineModule.java | 9 +- .../pipeline/PipelineResult.java | 57 +++++ .../pipeline/cursor/CursorSerializerUtil.java | 6 - .../pipeline/cursor/NoCursorSerializer.java | 55 ----- .../pipeline/cursor/RequestCursor.java | 7 +- .../pipeline/cursor/ResponseCursor.java | 15 +- .../pipeline/read/AbstractFilter.java | 110 --------- .../read/AbstractPipelineOperation.java | 44 ++++ .../pipeline/read/AbstractSeekingFilter.java | 103 ++++++++ .../pipeline/read/CandidateResultsFilter.java | 31 +++ .../pipeline/read/Collector.java | 31 +++ .../pipeline/read/CollectorFactory.java | 44 ++++ .../pipeline/read/CollectorFilter.java | 36 --- .../corepersistence/pipeline/read/Filter.java | 33 +-- .../pipeline/read/FilterFactory.java | 112 +++++++++ .../pipeline/read/PipelineOperation.java | 38 +++ .../pipeline/read/ReadFilterFactory.java | 102 -------- .../pipeline/read/ReadFilterFactoryImpl.java | 234 +++++++++---------- .../pipeline/read/ReadPipelineBuilder.java | 30 ++- .../pipeline/read/ReadPipelineBuilderImpl.java | 184 ++++++++++----- .../pipeline/read/ResultsPage.java | 42 ++++ .../pipeline/read/TraverseFilter.java | 30 --- .../AbstractElasticSearchFilter.java | 156 +++++++++++++ ...stractQueryElasticSearchCollectorFilter.java | 132 ----------- .../CandidateResultsEntityResultsCollector.java | 216 +++++++++++++++++ .../CandidateResultsIdVerifyFilter.java | 193 +++++++++++++++ .../CollectionElasticSearchFilter.java | 71 ++++++ .../ConnectionElasticSearchFilter.java | 72 ++++++ ...yCollectionElasticSearchCollectorFilter.java | 87 ------- ...yConnectionElasticSearchCollectorFilter.java | 91 -------- .../impl/ElasticSearchQueryExecutor.java | 69 +++--- .../pipeline/read/entity/EntityIdFilter.java | 15 +- .../read/entity/EntityLoadCollector.java | 94 ++++++++ .../read/entity/EntityLoadCollectorFilter.java | 137 ----------- .../graph/AbstractReadGraphEdgeByIdFilter.java | 17 +- .../read/graph/AbstractReadGraphFilter.java | 14 +- .../graph/ReadGraphConnectionByTypeFilter.java | 16 +- .../results/ObservableQueryExecutor.java | 62 ++++- .../org/apache/usergrid/persistence/Query.java | 100 ++++---- .../corepersistence/StaleIndexCleanupTest.java | 2 +- .../index/AsyncIndexServiceTest.java | 2 +- .../corepersistence/index/IndexServiceTest.java | 2 +- .../pipeline/cursor/CursorTest.java | 4 +- .../usergrid/persistence/CollectionIT.java | 12 +- .../org/apache/usergrid/persistence/GeoIT.java | 8 +- .../apache/usergrid/persistence/IndexIT.java | 8 +- .../PerformanceEntityRebuildIndexTest.java | 2 +- .../query/IntersectionTransitivePagingIT.java | 2 +- .../query/IntersectionUnionPagingIT.java | 2 +- .../persistence/query/IteratingQueryIT.java | 30 +-- .../persistence/query/NotSubPropertyIT.java | 2 +- .../index/ApplicationEntityIndex.java | 4 +- .../persistence/index/CandidateResults.java | 12 +- .../impl/EsApplicationEntityIndexImpl.java | 22 +- .../persistence/index/impl/EntityIndexTest.java | 74 +++--- .../persistence/index/impl/GeoPagingTest.java | 2 +- .../index/impl/IndexLoadTestsIT.java | 2 +- 62 files changed, 2087 insertions(+), 1415 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index aa0056c..8c3865e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -32,7 +32,9 @@ import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; +import org.apache.usergrid.corepersistence.pipeline.PipelineResult; import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder; +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; @@ -411,7 +413,7 @@ public class CpRelationManager implements RelationManager { logger.debug( "Wrote edge {}", edge ); } - indexService.queueEntityIndexUpdate( applicationScope, memberEntity ); + indexService.queueNewEdge( applicationScope, memberEntity, edge ); if ( logger.isDebugEnabled() ) { @@ -628,7 +630,7 @@ public class CpRelationManager implements RelationManager { pipelineBuilderFactory.createReadPipelineBuilder( applicationScope ); //set our fields applicable to both operations - readPipelineBuilder.withCursor( query.getOffsetCursor() ); + readPipelineBuilder.withCursor( query.getCursor() ); readPipelineBuilder.withLimit( query.getLimit() ); //TODO, this should be removed when the CP relation manager is removed @@ -642,7 +644,7 @@ public class CpRelationManager implements RelationManager { } - final Observable<Results> resultsObservable = readPipelineBuilder.build(); + final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute(); return new ObservableQueryExecutor( resultsObservable ).next(); } @@ -896,7 +898,7 @@ public class CpRelationManager implements RelationManager { pipelineBuilderFactory.createReadPipelineBuilder( applicationScope ); //set our fields applicable to both operations - readPipelineBuilder.withCursor( query.getOffsetCursor() ); + readPipelineBuilder.withCursor( query.getCursor() ); readPipelineBuilder.withLimit( query.getLimit() ); //TODO, this should be removed when the CP relation manager is removed @@ -905,15 +907,12 @@ public class CpRelationManager implements RelationManager { if ( query.isGraphSearch() ) { readPipelineBuilder.getConnection( connection ); } - else if ( entityType != null ) { - readPipelineBuilder.connectionWithQuery( connection, query.getQl().get(), entityType ); - } else { - readPipelineBuilder.connectionWithQuery( connection, query.getQl().get() ); + readPipelineBuilder.connectionWithQuery( connection, Optional.fromNullable( entityType ), query.getQl().get() ); } - final Observable<Results> resultsObservable = readPipelineBuilder.build(); + final Observable<PipelineResult<ResultsPage>> resultsObservable = readPipelineBuilder.execute(); return new ObservableQueryExecutor( resultsObservable ).next(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 71f02a8..c46542c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -33,9 +33,11 @@ import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexEdge; import org.apache.usergrid.persistence.index.IndexFig; +import org.apache.usergrid.persistence.index.impl.IndexOperation; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -47,6 +49,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import rx.Observable; +import rx.functions.Func1; import rx.observables.ConnectableObservable; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource; @@ -68,6 +71,7 @@ public class IndexServiceImpl implements IndexService { private final EdgesObservable edgesObservable; private final IndexFig indexFig; private final Timer indexTimer; + private final Timer addTimer; @Inject @@ -77,7 +81,8 @@ public class IndexServiceImpl implements IndexService { this.entityIndexFactory = entityIndexFactory; this.edgesObservable = edgesObservable; this.indexFig = indexFig; - this.indexTimer = metricsFactory.getTimer( IndexServiceImpl.class, "index.process"); + this.indexTimer = metricsFactory.getTimer( IndexServiceImpl.class, "index.update_all"); + this.addTimer = metricsFactory.getTimer( IndexServiceImpl.class, "index.add" ); } @@ -128,15 +133,43 @@ public class IndexServiceImpl implements IndexService { @Override - public Observable<IndexOperationMessage> indexEdge( final ApplicationScope applicationScope, final Entity entity, - final Edge edge ) { - throw new NotImplementedException( "Implement me" ); + public Observable<IndexOperationMessage> indexEdge( final ApplicationScope applicationScope, final Entity entity, final Edge edge ) { + + + + final Observable<IndexOperationMessage> batches = Observable.just( edge ).map( observableEdge -> { + + //if the node is the + if ( edge.getTargetNode().equals( entity.getId() ) ) { + return generateScopeFromSource( edge ); + } + + return generateScopeToTarget( edge ); + } ).flatMap( indexEdge -> { + + final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope ); + + + final EntityIndexBatch batch = ei.createBatch(); + + batch.index( indexEdge, entity ); + + return batch.execute(); + } ); + + return ObservableTimer.time( batches, addTimer ); + + } @Override public Observable<IndexOperationMessage> deleteIndexEdge( final ApplicationScope applicationScope, final Edge edge ) { + + + //TODO, query ES and remove this edge + throw new NotImplementedException( "Implement me" ); } @@ -144,6 +177,8 @@ public class IndexServiceImpl implements IndexService { @Override public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope, final Id entityId ) { + + //TODO query ES and remove this entityId throw new NotImplementedException( "Implement me" ); } @@ -189,4 +224,6 @@ public class IndexServiceImpl implements IndexService { + + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/DataPipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/DataPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/DataPipeline.java deleted file mode 100644 index 8463df9..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/DataPipeline.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline; - - -import java.util.ArrayList; -import java.util.List; - -import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; -import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; -import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; - -import rx.Observable; - - -/** - * A pipeline that will allow us to build a traversal command for execution - * - * See http://martinfowler.com/articles/collection-pipeline/ for some examples - * - * TODO: Re work the cursor and limit phases. They need to be lazily evaluated, not added on build time - */ -public class DataPipeline { - - - private final ApplicationScope applicationScope; - private final List<TraverseFilter> traverseFilterList; - - private Optional<String> cursor; - private int limit; - - - private int count = 0; - - - /** - * Our first pass, where we implement our start point as an Id until we can use this to perform our entire - * traversal. Eventually as we untangle the existing Query service nightmare, the sourceId will be remove and - * should only be traversed from the root application - */ - public DataPipeline( final ApplicationScope applicationScope ) { - - this.applicationScope = applicationScope; - - - traverseFilterList = new ArrayList<>(); - } - - - /** - * Add a read command that will read Ids and produce Ids. This is an intermediate traversal operations - */ - public DataPipeline withTraverseCommand( final TraverseFilter traverseCommand ) { - - this.traverseFilterList.add( traverseCommand ); - - return this; - } - - - /** - * Build the final collection step, and process our filters - */ - public <T> Observable<T> build( final CollectorFilter<T> pipeCollector ) { - - RequestCursor requestCursor = new RequestCursor( this.cursor ); - ResponseCursor responseCursor = new ResponseCursor(); - - Observable<Id> traverseObservable = Observable.just( applicationScope.getApplication() ); - - //build our traversal commands - for ( TraverseFilter filter : traverseFilterList ) { - setState( filter, requestCursor, responseCursor ); - - traverseObservable = traverseObservable.compose( filter ); - } - - - setState( pipeCollector, requestCursor, responseCursor ); - - pipeCollector.setLimit( limit ); - - return traverseObservable.compose( pipeCollector ); - } - - - public void setCursor( Optional<String> cursor ) { - this.cursor = cursor; - } - - - public void setLimit( final int limit ) { - this.limit = limit; - } - - - /** - * Set the id of the state - */ - private void setState( final Filter<?> filter, final RequestCursor requestCursor, - final ResponseCursor responseCursor ) { - - //TODO, see if we can wrap this observable in our ObservableTimer so we can see how long each filter takes - - - filter.setId( count ); - //done for clarity - count++; - - filter.setCursorCaches( requestCursor, responseCursor ); - filter.setApplicationScope( applicationScope ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java new file mode 100644 index 0000000..bc93b6c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline; + + +import java.util.List; + +import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; +import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; +import org.apache.usergrid.corepersistence.pipeline.read.Collector; +import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * A pipeline that will allow us to build a traversal command for execution + * + * See http://martinfowler.com/articles/collection-pipeline/ for some examples + * + * TODO: Re work the cursor and limit phases. They need to be lazily evaluated, not added on build time + */ +public class Pipeline<R> { + + + private final ApplicationScope applicationScope; + private final List<PipelineOperation> idPipelineOperationList; + private final Collector<?, R> collector; + private final RequestCursor requestCursor; + private final ResponseCursor responseCursor; + + private final int limit; + + + private int idCount = 0; + + + /** + * Our first pass, where we implement our start point as an Id until we can use this to perform our entire + * traversal. Eventually as we untangle the existing Query service nightmare, the sourceId will be remove and + * should only be traversed from the root application + */ + public Pipeline( final ApplicationScope applicationScope, final List<PipelineOperation> pipelineOperations, + final Collector<?, R> collector, final Optional<String> cursor, final int limit ) { + + this.applicationScope = applicationScope; + this.idPipelineOperationList = pipelineOperations; + this.collector = collector; + this.limit = limit; + + this.requestCursor = new RequestCursor( cursor ); + this.responseCursor = new ResponseCursor(); + } + + + /** + * Execute the pipline construction, returning an observable of results + * @return + */ + public Observable<PipelineResult<R>> execute(){ + + + Observable traverseObservable = Observable.just( applicationScope.getApplication() ); + + //build our traversal commands + for ( PipelineOperation pipelineOperation : idPipelineOperationList ) { + setState( pipelineOperation ); + + //TODO, see if we can wrap this observable in our ObservableTimer so we can see how long each filter takes + + + traverseObservable = traverseObservable.compose( pipelineOperation ); + } + + + setState( collector ); + + final Observable<R> response = traverseObservable.compose( collector ); + + + //append the optional cursor into the response for the caller to use + return response.map( result -> new PipelineResult<>( result, responseCursor ) ); + } + + + + + /** + * Set the id of the state + */ + private void setState( final PipelineOperation pipelineOperation ) { + + + final PipelineContext context = new PipelineContext( applicationScope, requestCursor, responseCursor, + limit, idCount ); + + pipelineOperation.setContext( context ); + + //done for clarity + idCount++; + + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java new file mode 100644 index 0000000..325f876 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline; + + +import java.io.Serializable; + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; +import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import com.google.common.base.Optional; + + +/** + * Encapsulates the context of the pipeline for the scope of the filter. + */ +public class PipelineContext { + + private final int id; + private final ApplicationScope applicationScope; + private final RequestCursor requestCursor; + private final ResponseCursor responseCursor; + private final int limit; + + + public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, + final ResponseCursor responseCursor, final int limit, final int id ) { + + this.applicationScope = applicationScope; + this.requestCursor = requestCursor; + this.responseCursor = responseCursor; + this.limit = limit; + this.id = id; + } + + + public ApplicationScope getApplicationScope() { + return applicationScope; + } + + + public int getId() { + return id; + } + + + /** + * Get our cursor value if present + * @param serializer + */ + public <T extends Serializable> Optional<T> getCursor( final CursorSerializer<T> serializer ) { + final T value = requestCursor.getCursor( id, serializer ); + + return Optional.fromNullable( value ); + } + + + /** + * Set the cursor value into our resposne + */ + public <T extends Serializable> void setCursorValue( final T value, final CursorSerializer<T> serializer ) { + responseCursor.setCursor( id, value, serializer ); + } + + + /** + * Get the limit for this execution + * @return + */ + public int getLimit() { + return limit; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java index 55b84af..3018718 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java @@ -20,7 +20,8 @@ package org.apache.usergrid.corepersistence.pipeline; -import org.apache.usergrid.corepersistence.pipeline.read.ReadFilterFactory; +import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory; +import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; import org.apache.usergrid.corepersistence.pipeline.read.ReadFilterFactoryImpl; import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder; import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilderImpl; @@ -38,7 +39,7 @@ public class PipelineModule extends AbstractModule { protected void configure() { //Use Guice to create the builder since we don't really need to do anything //other than DI when creating the filters - bind( ReadFilterFactory.class ).to( ReadFilterFactoryImpl.class ); +// bind( FilterFactory.class ).to( ReadFilterFactoryImpl.class ); //Use Guice to create the builder since we don't really need to do anything @@ -47,9 +48,11 @@ public class PipelineModule extends AbstractModule { .build( PipelineBuilderFactory.class ) ); +// install( new Factory) //Use Guice to create the builder since we don't really need to do anything //other than DI when creating the filters -// install( new FactoryModuleBuilder().build( ReadFilterFactory.class ) ); + install( new FactoryModuleBuilder().build( FilterFactory.class ) ); + install( new FactoryModuleBuilder().build( CollectorFactory.class )); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java new file mode 100644 index 0000000..fe8604e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineResult.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; + +import com.google.common.base.Optional; + + +/** + * Intermediate observable that will return results, as well as an optional cursor + * @param <R> + */ +public class PipelineResult<R> { + + + private final R result; + + private final ResponseCursor responseCursor; + + + public PipelineResult( final R result, final ResponseCursor responseCursor ) { + this.result = result; + this.responseCursor = responseCursor; + } + + + /** + * If the user requests our cursor, return the cursor + * @return + */ + public Optional<String> getCursor(){ + return this.responseCursor.encodeAsString(); + } + + public R getResult(){ + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java index 05c1018..fea0364 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java @@ -35,15 +35,9 @@ public class CursorSerializerUtil { private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY ); - private static final Base64Variant VARIANT = Base64Variants.MODIFIED_FOR_URL; - public static ObjectMapper getMapper() { return MAPPER; } - - public static Base64Variant getBase64() { - return VARIANT; - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java deleted file mode 100644 index 1d42df4..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/NoCursorSerializer.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.cursor; - - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; - - -/** - * Interface for cursor serialization - * - * TODO, the need for this seems to indicate an issue with our object composition. Refactor this away - */ -public class NoCursorSerializer<T> implements CursorSerializer<T> { - - private static final NoCursorSerializer<Object> INSTANCE = new NoCursorSerializer<>(); - - - @Override - public T fromJsonNode( final JsonNode node, final ObjectMapper objectMapper ) { - return null; - } - - - @Override - public JsonNode toNode( final ObjectMapper objectMapper, final T value ) { - return objectMapper.createObjectNode(); - } - - - /** - * convenience for type casting - */ - public static <T> NoCursorSerializer<T> create() { - return ( NoCursorSerializer<T> ) INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java index b117c21..870edbb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java @@ -45,7 +45,6 @@ public class RequestCursor { private static final int MAX_CURSOR_COUNT = 100; private static final ObjectMapper MAPPER = CursorSerializerUtil.getMapper(); - private static final Base64Variant VARIANT = CursorSerializerUtil.getBase64(); private final Map<Integer, JsonNode> parsedCursor; @@ -62,11 +61,17 @@ public class RequestCursor { /** * Get the cursor with the specified id + * + * May return null if not found */ public <T> T getCursor( final int id, final CursorSerializer<T> serializer ) { final JsonNode node = parsedCursor.get( id ); + if(node == null){ + return null; + } + return serializer.fromJsonNode( node, MAPPER ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java index e379a34..f1c8c24 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java @@ -30,6 +30,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Optional; /** @@ -39,7 +40,6 @@ public class ResponseCursor { private static final ObjectMapper MAPPER = CursorSerializerUtil.getMapper(); - private static final Base64Variant VARIANT = CursorSerializerUtil.getBase64(); /** * We use a map b/c some indexes might be skipped @@ -61,8 +61,14 @@ public class ResponseCursor { /** * now we're done, encode as a string */ - public String encodeAsString() { + public Optional<String> encodeAsString() { try { + + if(cursors.isEmpty()){ + return Optional.absent(); + } + + final ObjectNode map = MAPPER.createObjectNode(); for ( Map.Entry<Integer, CursorEntry<?>> entry : cursors.entrySet() ) { @@ -78,8 +84,9 @@ public class ResponseCursor { final byte[] output = MAPPER.writeValueAsBytes(map); //generate a base64 url save string - return Base64.getUrlEncoder().encodeToString( output ); -// return MAPPER.writer( VARIANT ).writeValueAsString( map ); + final String value = Base64.getUrlEncoder().encodeToString( output ); + + return Optional.of( value ); } catch ( JsonProcessingException e ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java deleted file mode 100644 index 3564a79..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractFilter.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import java.io.Serializable; - -import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; -import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; - -import com.google.common.base.Optional; - - -/** - * Basic functionality for our commands to handle cursor IO - */ -public abstract class AbstractFilter<T, C extends Serializable> implements Filter<T> { - - private int id; - /** - * The cache of the cursor that was set when the read was started - */ - private RequestCursor readCache; - - /** - * The current state of the write cache. Gets updated as we traverse the observables - */ - private ResponseCursor writeCache; - - - /** - * The applicationScope - */ - protected ApplicationScope applicationScope; - - - @Override - public void setId( final int id ) { - this.id = id; - } - - - @Override - public void setCursorCaches( final RequestCursor readCache, final ResponseCursor writeCache ) { - this.readCache = readCache; - this.writeCache = writeCache; - } - - - @Override - public void setApplicationScope( final ApplicationScope applicationScope ) { - this.applicationScope = applicationScope; - } - - - /** - * Return the parsed value of the cursor from the last request, if it exists - */ - protected Optional<C> getCursor() { - final C cursor = readCache.getCursor( id, getCursorSerializer() ); - - return Optional.fromNullable( cursor ); - } - - - - - - /** - * Set the cursor value into the new cursor write cache - * @param newValue - */ - protected void setCursor(final C newValue){ - writeCache.setCursor( id, newValue, getCursorSerializer() ); - } - - - /** - * Generate our state as a cursor - * @return - */ - protected String generateCursor(){ - return writeCache.encodeAsString(); - } - - /** - * Return the class to be used when parsing the cursor - */ - protected abstract CursorSerializer<C> getCursorSerializer(); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java new file mode 100644 index 0000000..8d7f106 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPipelineOperation.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import org.apache.usergrid.corepersistence.pipeline.PipelineContext; + + +/** + * Basic functionality for our commands to handle cursor IO + * @param <T> the input type + * @param <R> The output Type + */ +public abstract class AbstractPipelineOperation<T, R> implements PipelineOperation<T, R> { + + + protected PipelineContext pipelineContext; + + + @Override + public void setContext( final PipelineContext pipelineContext ) { + this.pipelineContext = pipelineContext; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java new file mode 100644 index 0000000..9509678 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractSeekingFilter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import java.io.Serializable; + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; + +import com.google.common.base.Optional; + + +/** + * Abstract class for filters to extend that require a cursor + * @param <T> The input type + * @param <R> The response type + * @param <C> The cursor type + */ +public abstract class AbstractSeekingFilter<T, R, C extends Serializable> extends AbstractPipelineOperation<T, R> implements Filter<T, R> { + + + + //TODO not a big fan of this, but not sure how to build resume otherwise + private CursorSeek<C> cursorSeek; + + + /** + * Return the parsed value of the cursor from the last request, if it exists + */ + protected Optional<C> getSeekValue() { + + if(cursorSeek == null) { + + final Optional<C> cursor = pipelineContext.getCursor( getCursorSerializer() ); + cursorSeek = new CursorSeek<>( cursor ); + } + + return cursorSeek.getSeekValue(); + + } + + + /** + * Sets the cursor into our pipeline context + * @param newValue + */ + protected void setCursor(final C newValue){ + pipelineContext.setCursorValue( newValue, getCursorSerializer() ); + } + + + /** + * Return the class to be used when parsing the cursor + */ + protected abstract CursorSerializer<C> getCursorSerializer(); + + + /** + * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values. + * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation + */ + protected static class CursorSeek<C> { + + private Optional<C> seek; + + private CursorSeek(final Optional<C> cursorValue){ + seek = cursorValue; + } + + + /** + * Get the seek value to use when searching + * @return + */ + public Optional<C> getSeekValue(){ + final Optional<C> toReturn = seek; + + seek = Optional.absent(); + + return toReturn; + } + + + + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java new file mode 100644 index 0000000..4e6d06e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CandidateResultsFilter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import org.apache.usergrid.persistence.index.CandidateResults; +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * Traverses edges in the graph. Either by query or graph traversal. Take an observable of ids, and emits + * an observable of ids + */ +public interface CandidateResultsFilter extends PipelineOperation<Id, CandidateResults> {} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java new file mode 100644 index 0000000..69d929c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Collector.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +/** + * A command that is used to reduce our stream of results into a final output + * @param <T> + */ +public interface Collector<T, R> extends PipelineOperation<T, R> { + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java new file mode 100644 index 0000000..6893b34 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector; +import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector; + + +/** + * A factory for generating collectors + */ +public interface CollectorFactory { + + /** + * Generate a new instance of the command with the specified parameters + */ + EntityLoadCollector entityLoadCollector(); + + /** + * Get the collector for collection candidate results to entities + * @return + */ + CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector(); + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFilter.java deleted file mode 100644 index 883e910..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CollectorFilter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -/** - * A command that is used to reduce our stream of results into a final output - * @param <T> - */ -public interface CollectorFilter<T> extends Filter<T> { - - /** - * Set the prefered result size for the command - * @param limit - */ - void setLimit( final int limit ); - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java index f50a2f4..ace62db 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java @@ -20,38 +20,11 @@ package org.apache.usergrid.corepersistence.pipeline.read; -import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; -import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; -import rx.Observable; - /** - * Interface for filtering commands. All filters must take an observable of Id's as an input. Output is then determined by subclasses. - * This takes an input of Id, performs some operation, and emits values for further processing in the Observable - * pipeline + * Traverses edges in the graph. Either by query or graph traversal. Take an observable of ids, and emits + * an observable of ids */ -public interface Filter<T> extends Observable.Transformer<Id, T> { - - - /** - * Set the id of this filter in it's execution environment - */ - void setId( final int id ); - - /** - * Set the cursor cache into the filter - * - * @param readCache Set the cache that was used in the request - * @param writeCache Set the cache to be used when writing the results - */ - void setCursorCaches( final RequestCursor readCache, final ResponseCursor writeCache ); - - /** - * Set the application scope of the filter - * @param applicationScope - */ - void setApplicationScope(final ApplicationScope applicationScope); -} +public interface Filter<T, R> extends PipelineOperation<T, R> {} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java new file mode 100644 index 0000000..7a61961 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsIdVerifyFilter; +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CollectionElasticSearchFilter; +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ConnectionElasticSearchFilter; +import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.inject.assistedinject.Assisted; + + +/** + * A factory for generating read commands + */ +public interface FilterFactory { + + + /** + * Generate a new instance of the command with the specified parameters + * @param collectionName The collection name to use when reading the graph + */ + ReadGraphCollectionFilter readGraphCollectionFilter( final String collectionName ); + + /** + * Read a connection between two entities, the incoming and the target entity + * + * @param collectionName The collection name to use when reading the edge + * @param targetId The target id to use when traversing the graph + */ + ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( final String collectionName, final Id targetId ); + + /** + * Generate a new instance of the command with the specified parameters + * @param connectionName The connection name to use when traversing the graph + */ + ReadGraphConnectionFilter readGraphConnectionFilter( final String connectionName ); + + /** + * Generate a new instance of the command with the specified parameters + * @param connectionName The connection name to use when traversing the graph + * @param entityType The entity type to use when traversing the graph + */ + ReadGraphConnectionByTypeFilter readGraphConnectionByTypeFilter( + @Assisted( "connectionName" ) final String connectionName, @Assisted( "entityType" ) final String entityType ); + + + /** + * Read a connection directly between two identifiers + * @param connectionName The connection name to use when traversing the graph + * @param targetId The target Id to use when traversing the graph + */ + ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, final Id targetId ); + + /** + * Generate a new instance of the command with the specified parameters + * @param query The query to use when querying the entities in the collection + * @param collectionName The collection name to use when querying + */ + CollectionElasticSearchFilter collectionElasticSearchFilter( @Assisted( "query" ) final String query, + @Assisted( "collectionName" ) + final String collectionName ); + + + /** + * Generate a new instance of the command with the specified parameters + * @param query The query to use when querying the entities in the connection + * @param connectionName The type of connection to query + * @param connectedEntityType The type of entity in the connection. Leave absent to query all entity types + */ + ConnectionElasticSearchFilter connectionElasticSearchFilter( @Assisted( "query" ) final String query, + @Assisted( "connectionName" ) final String connectionName, + @Assisted("connectedEntityType") final Optional<String> connectedEntityType); + + + /** + * Get a candidate ids verifier for collection results. Should be inserted into pipelines where a query filter is an intermediate step, + * not a final filter before collectors + */ + CandidateResultsIdVerifyFilter candidateResultsIdVerifyFilter(); + + /** + * Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals + * @param entityId The entity id to emit + */ + EntityIdFilter getEntityIdFilter( final Id entityId ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java new file mode 100644 index 0000000..28bba36 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/PipelineOperation.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import org.apache.usergrid.corepersistence.pipeline.PipelineContext; + +import rx.Observable; + + +/** + * Interface for filtering commands. All filters must take an observable of Id's as an input. Output is then determined by subclasses. + * This takes an input of Id, performs some operation, and emits values for further processing in the Observable + * pipeline + * @param <T> The input type + * @param <R> + */ +public interface PipelineOperation< T, R> extends Observable.Transformer<T, R> { + + void setContext(final PipelineContext pipelineContext); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java deleted file mode 100644 index 92bdacb..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryCollectionElasticSearchCollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryConnectionElasticSearchCollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; - - -/** - * A factory for generating read commands - */ -public interface ReadFilterFactory { - - - /** - * Generate a new instance of the command with the specified parameters - */ - ReadGraphCollectionFilter readGraphCollectionCommand( final String collectionName ); - - /** - * Read a connection between two entities, the incoming and the target entity - * @param collectionName - * @param targetId - * @return - */ - ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter(final String collectionName, final Id targetId); - - /** - * Generate a new instance of the command with the specified parameters - */ - ReadGraphConnectionFilter readGraphConnectionCommand( final String connectionName ); - - /** - * Generate a new instance of the command with the specified parameters - */ - ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String connectionName, final String entityType ); - - - /** - * Read a connection directly between two identifiers - * @param connectionName - * @param targetId - * @return - */ - ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter(final String connectionName, final Id targetId); - - /** - * Generate a new instance of the command with the specified parameters - */ - EntityLoadCollectorFilter entityLoadCollector(); - - /** - * Generate a new instance of the command with the specified parameters - */ - QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector( final String collectionName, final String query); - - - /** - * Generate a new instance of the command with the specified parameters - */ - QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( final String connectionName,final String query); - - - /** - * Generate a new instance of the command with the specified parameters - */ - QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( final String connectionName, final String connectionEntityType, final String query ); - - - /** - * Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals - */ - EntityIdFilter getEntityIdFilter( final Id entityId ); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java index 19162bb..0f73fb9 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java @@ -20,133 +20,117 @@ package org.apache.usergrid.corepersistence.pipeline.read; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryCollectionElasticSearchCollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryConnectionElasticSearchCollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter; -import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; import com.google.inject.Singleton; @Singleton -public class ReadFilterFactoryImpl implements ReadFilterFactory { - - - private final GraphManagerFactory graphManagerFactory; - private final EntityIndexFactory entityIndexFactory; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - - - @Inject - public ReadFilterFactoryImpl( final GraphManagerFactory graphManagerFactory, - final EntityIndexFactory entityIndexFactory, - final EntityCollectionManagerFactory entityCollectionManagerFactory ) { - - - this.graphManagerFactory = graphManagerFactory; - this.entityIndexFactory = entityIndexFactory; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - } - - - @Override - public ReadGraphCollectionFilter readGraphCollectionCommand( final String collectionName ) { - return new ReadGraphCollectionFilter( graphManagerFactory, collectionName ); - } - - - @Override - public ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( final String collectionName, - final Id targetId ) { - return new ReadGraphCollectionByIdFilter( graphManagerFactory, collectionName, targetId ); - } - - - @Override - public ReadGraphConnectionFilter readGraphConnectionCommand( final String connectionName ) { - return new ReadGraphConnectionFilter( graphManagerFactory, connectionName ); - } - - - @Override - public ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String connectionName, - final String entityType ) { - return new ReadGraphConnectionByTypeFilter( graphManagerFactory, connectionName, entityType ); - } - - - @Override - public ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, - final Id targetId ) { - return new ReadGraphConnectionByIdFilter( graphManagerFactory, connectionName, targetId ); - } - - - @Override - public EntityLoadCollectorFilter entityLoadCollector() { - return new EntityLoadCollectorFilter( entityCollectionManagerFactory ); - } - - - /** - * TODO refactor these impls to use RX internally, as well as remove the query object - */ - @Override - public QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector( - final String collectionName, final String query ) { - - final Query queryObject = Query.fromQL( query ); - - final QueryCollectionElasticSearchCollectorFilter filter = - new QueryCollectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, - collectionName, queryObject ); - - return filter; - } - - - @Override - public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( - final String connectionName, final String query ) { - - final Query queryObject = Query.fromQL( query ); - - final QueryConnectionElasticSearchCollectorFilter filter = - new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, - connectionName, queryObject ); - - return filter; - } - - - @Override - public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( - final String connectionName, final String connectionEntityType, final String query ) { - - final Query queryObject = Query.fromQL( query ); - queryObject.setConnectionType( connectionEntityType ); - - final QueryConnectionElasticSearchCollectorFilter filter = - new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, - connectionName, queryObject ); - - return filter; - } - - - @Override - public EntityIdFilter getEntityIdFilter( final Id entityId ) { - return new EntityIdFilter( entityId ); - } +public class ReadFilterFactoryImpl { //implements ReadFilterFactory { + +// +// private final GraphManagerFactory graphManagerFactory; +// private final EntityIndexFactory entityIndexFactory; +// private final EntityCollectionManagerFactory entityCollectionManagerFactory; +// +// +// @Inject +// public ReadFilterFactoryImpl( final GraphManagerFactory graphManagerFactory, +// final EntityIndexFactory entityIndexFactory, +// final EntityCollectionManagerFactory entityCollectionManagerFactory ) { +// +// +// this.graphManagerFactory = graphManagerFactory; +// this.entityIndexFactory = entityIndexFactory; +// this.entityCollectionManagerFactory = entityCollectionManagerFactory; +// } +// +// +// @Override +// public ReadGraphCollectionFilter readGraphCollectionCommand( final String collectionName ) { +// return new ReadGraphCollectionFilter( graphManagerFactory, collectionName ); +// } +// +// +// @Override +// public ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( final String collectionName, +// final Id targetId ) { +// return new ReadGraphCollectionByIdFilter( graphManagerFactory, collectionName, targetId ); +// } +// +// +// @Override +// public ReadGraphConnectionFilter readGraphConnectionCommand( final String connectionName ) { +// return new ReadGraphConnectionFilter( graphManagerFactory, connectionName ); +// } +// +// +// @Override +// public ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String connectionName, +// final String entityType ) { +// return new ReadGraphConnectionByTypeFilter( graphManagerFactory, connectionName, entityType ); +// } +// +// +// @Override +// public ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, +// final Id targetId ) { +// return new ReadGraphConnectionByIdFilter( graphManagerFactory, connectionName, targetId ); +// } +// +// +// @Override +// public EntityLoadCollector entityLoadCollector() { +// return new EntityLoadCollector( entityCollectionManagerFactory ); +// } +// +// +// /** +// * TODO refactor these impls to use RX internally, as well as remove the query object +// */ +// @Override +// public QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector( +// final String collectionName, final String query ) { +// +// final Query queryObject = Query.fromQL( query ); +// +// final QueryCollectionElasticSearchCollectorFilter filter = +// new QueryCollectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, +// collectionName, queryObject ); +// +// return filter; +// } +// +// +// @Override +// public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( +// final String connectionName, final String query ) { +// +// final Query queryObject = Query.fromQL( query ); +// +// final QueryConnectionElasticSearchCollectorFilter filter = +// new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, +// connectionName, queryObject ); +// +// return filter; +// } +// +// +// @Override +// public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( +// final String connectionName, final String connectionEntityType, final String query ) { +// +// final Query queryObject = Query.fromQL( query ); +// queryObject.setConnectionType( connectionEntityType ); +// +// final QueryConnectionElasticSearchCollectorFilter filter = +// new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, +// connectionName, queryObject ); +// +// return filter; +// } +// +// +// @Override +// public EntityIdFilter getEntityIdFilter( final Id entityId ) { +// return new EntityIdFilter( entityId ); +// } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java index 5d83dac..9da2b03 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java @@ -20,15 +20,21 @@ package org.apache.usergrid.corepersistence.pipeline.read; +import org.apache.usergrid.corepersistence.pipeline.PipelineResult; +import org.apache.usergrid.persistence.Entity; import org.apache.usergrid.persistence.Results; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + import rx.Observable; /** - * An instance of a pipeline builder for building commands. - * Each invocation of the method will assemple the underlying pipe and updating it's state + * An instance of a pipeline builder for building commands on our read pipline + * + * Each invocation of the method will assemble the underlying pipe and updating it's state + * * Results are added by invoking execute. */ public interface ReadPipelineBuilder { @@ -38,14 +44,14 @@ public interface ReadPipelineBuilder { * Set the cursor * @param cursor */ - ReadPipelineBuilder withCursor(final String cursor); + ReadPipelineBuilder withCursor(final Optional<String> cursor); /** * Set the limit of our page sizes * @param limit * @return */ - ReadPipelineBuilder withLimit(final int limit); + ReadPipelineBuilder withLimit(final Optional<Integer> limit); /** * An operation to bridge 2.0-> 1.0. Should be removed when everyone uses the pipeline @@ -87,24 +93,14 @@ public interface ReadPipelineBuilder { ReadPipelineBuilder getConnection( final String connectionName, final String entityType ); /** - * Get all entities in a connection with a query - */ - ReadPipelineBuilder connectionWithQuery( final String connectionName, final String query ); - - - /** * Get all entities in a connection with a query and a target entity type */ - ReadPipelineBuilder connectionWithQuery( final String connectionName, final String entityType, final String query); - - - - + ReadPipelineBuilder connectionWithQuery( final String connectionName, final Optional<String> entityType, final String query); /** - * Execute final construction of the pipeline and return the results + * Load our entity results when our previous filter calls graph * @return */ - Observable<Results> build(); + Observable<PipelineResult<ResultsPage>> execute(); }
