Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-641 6d54dffc4 -> aa9153ac8
Updates pipeline and fixes connectionref querying Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aa9153ac Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aa9153ac Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aa9153ac Branch: refs/heads/USERGRID-641 Commit: aa9153ac84a2a7e68dd0c9144ab867c6f75c68a0 Parents: 6d54dff Author: Todd Nine <[email protected]> Authored: Thu May 21 17:10:21 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Thu May 21 17:10:21 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpRelationManager.java | 21 ++-- .../pipeline/FilterPipeline.java | 107 ------------------ .../corepersistence/pipeline/Pipeline.java | 110 +++++++++++++++++++ .../pipeline/PipelineModule.java | 9 +- .../pipeline/builder/CandidateBuilder.java | 12 +- .../pipeline/builder/ConnectionBuilder.java | 37 ------- .../pipeline/builder/ConnectionRefBuilder.java | 6 +- .../pipeline/builder/EntityBuilder.java | 13 ++- .../pipeline/builder/IdBuilder.java | 25 ++--- .../pipeline/builder/PipelineBuilder.java | 6 +- .../results/ConnectionRefQueryExecutor.java | 60 ++++++++++ .../results/EntityQueryExecutor.java | 84 ++++++++++++++ .../results/ObservableQueryExecutor.java | 52 +++------ .../corepersistence/results/QueryExecutor.java | 1 + 14 files changed, 320 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 1c34929..be143ce 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -31,12 +31,12 @@ import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder; import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder; import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor; +import org.apache.usergrid.corepersistence.results.EntityQueryExecutor; import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; @@ -640,11 +640,11 @@ public class CpRelationManager implements RelationManager { } else { final String entityType = collection.getType(); - results = pipelineBuilder.searchCollection( collectionName, entityType, query.getQl().get() ).loadEntities(); + results = pipelineBuilder.searchCollection( collectionName, query.getQl().get() , entityType).loadEntities(); } - return new ObservableQueryExecutor( results.build() ).next(); + return new EntityQueryExecutor( results.build() ).next(); } @@ -936,21 +936,24 @@ public class CpRelationManager implements RelationManager { if(query.getResultsLevel() == Level.REFS){ - final Observable<ResultsPage<ConnectionRef>> results; + final IdBuilder traversedIds; if(query.isGraphSearch()){ - results = pipelineBuilder.traverseConnection( connection, entityType ).loadConnectionRefs( cpHeadEntity.getId(), connection ).build(); + traversedIds = pipelineBuilder.traverseConnection( connection, entityType ); } else { - results = pipelineBuilder.searchConnection( connection, query.getQl().get(),entityType) .loadIds().loadConnectionRefs( cpHeadEntity.getId(), connection ).build(); + traversedIds = pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds(); } - throw new UnsupportedOperationException( "Implement me" ); + final Observable<ResultsPage<ConnectionRef>> results = traversedIds.loadConnectionRefs( + cpHeadEntity.getId(), connection ).build(); + + return new ConnectionRefQueryExecutor( results ).next(); } @@ -978,7 +981,7 @@ public class CpRelationManager implements RelationManager { - return new ObservableQueryExecutor( results ).next(); + return new EntityQueryExecutor( results ).next(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java deleted file mode 100644 index 089f47d..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/FilterPipeline.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline; - - -import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.util.ValidationUtils; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; - -import rx.Observable; - - -/** - * Pipeline for applying our UG domain specific filters. - * - * Modeled after an observable, with typing to allow input of specific filters - * - * @param InputType the input type in the current pipeline state - */ -public class FilterPipeline<InputType> { - - - private int idCount = 0; - - private final ApplicationScope applicationScope; - - - private final RequestCursor requestCursor; - private int limit; - - //Generics hell, intentionally without a generic, we check at the filter level - private Observable currentObservable; - - - /** - * Create our filter pipeline - */ - public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) { - - - ValidationUtils.validateApplicationScope( applicationScope ); - Preconditions.checkNotNull( cursor, "cursor optional is required" ); - Preconditions.checkArgument( limit > 0, "limit must be > 0" ); - - - this.applicationScope = applicationScope; - - //init our cursor to empty - this.requestCursor = new RequestCursor( cursor ); - - //set the default limit - this.limit = limit; - - //set our observable to start at the application - final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() ); - this.currentObservable = Observable.just( filter ); - } - - - public <OutputType> FilterPipeline<OutputType> withFilter( - final PipelineOperation<? super InputType, ? extends OutputType> filter ) { - - - - final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount ); - - filter.setContext( context ); - - //done for clarity - idCount++; - - return ( FilterPipeline<OutputType> ) this; - } - - - - /** - * Return the observable of the filter pipeline - */ - public Observable<InputType> execute() { - return currentObservable; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java new file mode 100644 index 0000000..dc95178 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.util.ValidationUtils; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +import rx.Observable; + + +/** + * Pipeline for applying our UG domain specific filters. + * + * Modeled after an observable, with typing to allow input of specific filters + * + * @param InputType the input type in the current pipeline state + */ +public class Pipeline<InputType> { + + + private int idCount = 0; + + private final ApplicationScope applicationScope; + + + private final RequestCursor requestCursor; + private int limit; + + //Generics hell, intentionally without a generic, we check at the filter level + private Observable currentObservable; + + + /** + * Create our filter pipeline + */ + public Pipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) { + + + ValidationUtils.validateApplicationScope( applicationScope ); + Preconditions.checkNotNull( cursor, "cursor optional is required" ); + Preconditions.checkArgument( limit > 0, "limit must be > 0" ); + + + this.applicationScope = applicationScope; + + //init our cursor to empty + this.requestCursor = new RequestCursor( cursor ); + + //set the default limit + this.limit = limit; + + //set our observable to start at the application + final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() ); + this.currentObservable = Observable.just( filter ); + } + + + public <OutputType> Pipeline<OutputType> withFilter( + final PipelineOperation<? super InputType, ? extends OutputType> filter ) { + + + + final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount ); + + filter.setContext( context ); + + //update the observable + this.currentObservable = currentObservable.compose( filter ); + + //done for clarity + idCount++; + + return ( Pipeline<OutputType> ) this; + } + + + + /** + * Return the observable of the filter pipeline + */ + public Observable<InputType> execute() { + return currentObservable; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java index 8ec8704..93df066 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline; +import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory; import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; import com.google.inject.AbstractModule; @@ -33,15 +34,11 @@ public class PipelineModule extends AbstractModule { @Override protected void configure() { - //Use Guice to create the builder since we don't really need to do anything - //other than DI when creating the filters -// bind( FilterFactory.class ).to( ReadFilterFactoryImpl.class ); - - -// install( new Factory) //Use Guice to create the builder since we don't really need to do anything //other than DI when creating the filters install( new FactoryModuleBuilder().build( FilterFactory.class ) ); + + install( new FactoryModuleBuilder().build( PipelineBuilderFactory.class ) ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java index 5cb2eab..9354127 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java @@ -21,7 +21,7 @@ package org.apache.usergrid.corepersistence.pipeline.builder; import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; -import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate; import org.apache.usergrid.persistence.model.entity.Entity; @@ -31,13 +31,13 @@ import org.apache.usergrid.persistence.model.entity.Id; public class CandidateBuilder { - private final FilterPipeline<FilterResult<Candidate>> filterPipeline; + private final Pipeline<FilterResult<Candidate>> pipeline; private final FilterFactory filterFactory; - public CandidateBuilder( final FilterPipeline<FilterResult<Candidate>> filterPipeline, + public CandidateBuilder( final Pipeline<FilterResult<Candidate>> pipeline, final FilterFactory filterFactory ) { - this.filterPipeline = filterPipeline; + this.pipeline = pipeline; this.filterFactory = filterFactory; } @@ -48,7 +48,7 @@ public class CandidateBuilder { */ public IdBuilder loadIds(){ - final FilterPipeline<FilterResult<Id>> newFilter = filterPipeline.withFilter( filterFactory.candidateResultsIdVerifyFilter() ); + final Pipeline<FilterResult<Id>> newFilter = pipeline.withFilter( filterFactory.candidateResultsIdVerifyFilter() ); return new IdBuilder( newFilter, filterFactory ); } @@ -60,7 +60,7 @@ public class CandidateBuilder { */ public EntityBuilder loadEntities(){ - final FilterPipeline<FilterResult<Entity>> newFilter = filterPipeline.withFilter( filterFactory.candidateEntityFilter() ); + final Pipeline<FilterResult<Entity>> newFilter = pipeline.withFilter( filterFactory.candidateEntityFilter() ); return new EntityBuilder(newFilter ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java deleted file mode 100644 index b4ea94e..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionBuilder.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.builder; - - -import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; -import org.apache.usergrid.persistence.ConnectionRef; - -import rx.Observable; - - -public class ConnectionBuilder { - - - - public Observable<ResultsPage<ConnectionRef>> build(){ - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java index 6c0ebc8..362f2c6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/ConnectionRefBuilder.java @@ -20,7 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.builder; -import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector; @@ -36,9 +36,9 @@ import rx.Observable; public class ConnectionRefBuilder { - private final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter; + private final Pipeline<FilterResult<ConnectionRef>> connectionRefFilter; - public ConnectionRefBuilder( final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter ) { + public ConnectionRefBuilder( final Pipeline<FilterResult<ConnectionRef>> connectionRefFilter ) { this.connectionRefFilter = connectionRefFilter; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java index 07b4586..b120c56 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/EntityBuilder.java @@ -20,9 +20,11 @@ package org.apache.usergrid.corepersistence.pipeline.builder; -import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityResumeFilter; +import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector; import org.apache.usergrid.persistence.model.entity.Entity; import rx.Observable; @@ -33,11 +35,11 @@ import rx.Observable; */ public class EntityBuilder { - private final FilterPipeline<FilterResult<Entity>> filterPipeline; + private final Pipeline<FilterResult<Entity>> pipeline; - public EntityBuilder( final FilterPipeline<FilterResult<Entity>> filterPipeline ) { - this.filterPipeline = filterPipeline; + public EntityBuilder( final Pipeline<FilterResult<Entity>> pipeline ) { + this.pipeline = pipeline; } @@ -46,6 +48,7 @@ public class EntityBuilder { * @return */ public Observable<ResultsPage<Entity>> build(){ - return null; + //we must add our resume filter so we drop our previous page first element if it's present + return pipeline.withFilter( new EntityResumeFilter() ).withFilter( new ResultsPageCollector<>() ).execute(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java index 12a89ba..4291ea9 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java @@ -22,12 +22,11 @@ package org.apache.usergrid.corepersistence.pipeline.builder; import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; -import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter; import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter; import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate; -import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadGraphConnectionByTypeFilter; import org.apache.usergrid.persistence.ConnectionRef; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -42,11 +41,11 @@ public class IdBuilder { private final FilterFactory filterFactory; - private final FilterPipeline<FilterResult<Id>> filterPipeline; + private final Pipeline<FilterResult<Id>> pipeline; - public IdBuilder( final FilterPipeline<FilterResult<Id>> filterPipeline, final FilterFactory filterFactory ) { - this.filterPipeline = filterPipeline; + public IdBuilder( final Pipeline<FilterResult<Id>> pipeline, final FilterFactory filterFactory ) { + this.pipeline = pipeline; this.filterFactory = filterFactory; } @@ -56,8 +55,8 @@ public class IdBuilder { * @return */ public EntityBuilder loadEntities() { - final FilterPipeline<FilterResult<Entity>> pipeline = - filterPipeline.withFilter( filterFactory.entityLoadFilter() ); + final Pipeline<FilterResult<Entity>> pipeline = + this.pipeline.withFilter( filterFactory.entityLoadFilter() ); return new EntityBuilder( pipeline ); } @@ -69,8 +68,8 @@ public class IdBuilder { * @return */ public IdBuilder traverseCollection( final String collectionName ) { - final FilterPipeline<FilterResult<Id>> newFilter = - filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) ); + final Pipeline<FilterResult<Id>> newFilter = + pipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) ); return new IdBuilder( newFilter, filterFactory ); } @@ -93,7 +92,7 @@ public class IdBuilder { } - return new IdBuilder( filterPipeline.withFilter(filter ), filterFactory ); + return new IdBuilder( pipeline.withFilter(filter ), filterFactory ); } @@ -106,7 +105,7 @@ public class IdBuilder { */ public CandidateBuilder searchCollection( final String collectionName, final String ql, final String entityType ) { - final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchCollectionFilter( + final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchCollectionFilter( ql, collectionName, entityType ) ); return new CandidateBuilder( newFilter, filterFactory ); @@ -123,7 +122,7 @@ public class IdBuilder { public CandidateBuilder searchConnection( final String connectionName, final String ql , final Optional<String> entityType) { - final FilterPipeline<FilterResult<Candidate>> newFilter = filterPipeline.withFilter( filterFactory.searchConnectionFilter( + final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchConnectionFilter( ql, connectionName, entityType ) ); return new CandidateBuilder( newFilter, filterFactory ); @@ -139,7 +138,7 @@ public class IdBuilder { @Deprecated public ConnectionRefBuilder loadConnectionRefs(final Id sourceId, final String connectionType){ - final FilterPipeline<FilterResult<ConnectionRef>> connectionRefFilter = filterPipeline.withFilter( new ConnectionRefFilter(sourceId, connectionType ) ).withFilter( + final Pipeline<FilterResult<ConnectionRef>> connectionRefFilter = pipeline.withFilter( new ConnectionRefFilter(sourceId, connectionType ) ).withFilter( new ConnectionRefResumeFilter() ); return new ConnectionRefBuilder(connectionRefFilter); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java index 488e9c1..f1a44ea 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java @@ -21,7 +21,7 @@ package org.apache.usergrid.corepersistence.pipeline.builder; import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; -import org.apache.usergrid.corepersistence.pipeline.FilterPipeline; +import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; @@ -91,9 +91,9 @@ public class PipelineBuilder { */ @Deprecated public IdBuilder fromId(final Id entityId){ - FilterPipeline<FilterResult<Id>> filterPipeline = new FilterPipeline( applicationScope, this.cursor,limit ).withFilter( filterFactory.getEntityIdFilter( entityId ) ); + Pipeline<FilterResult<Id>> pipeline = new Pipeline( applicationScope, this.cursor,limit ).withFilter( filterFactory.getEntityIdFilter( entityId ) ); - return new IdBuilder( filterPipeline, filterFactory ); + return new IdBuilder( pipeline, filterFactory ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java new file mode 100644 index 0000000..798c9c7 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.results; + + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; +import org.apache.usergrid.persistence.ConnectionRef; +import org.apache.usergrid.persistence.EntityFactory; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; + + +/** + * Processes our results of entities and turns them into + */ +@Deprecated//Required for 1.0 compatibility +public class ConnectionRefQueryExecutor extends ObservableQueryExecutor<ConnectionRef> { + + + public ConnectionRefQueryExecutor( final Observable<ResultsPage<ConnectionRef>> resultsObservable ) { + super( resultsObservable ); + } + + + @Override + protected Results createResults( final ResultsPage resultsPage ) { + final List<ConnectionRef> connectionRefs = resultsPage.getEntityList(); + + final Results results = Results.fromConnections(connectionRefs); + + return results; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java new file mode 100644 index 0000000..bc9001e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.results; + + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; +import org.apache.usergrid.persistence.EntityFactory; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; + + +/** + * Processes our results of entities and turns them into + */ +@Deprecated//Required for 1.0 compatibility +public class EntityQueryExecutor extends ObservableQueryExecutor<Entity> { + + + public EntityQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable ) { + super( resultsObservable ); + } + + + @Override + protected Results createResults( final ResultsPage resultsPage ) { + + final List<Entity> entityList = resultsPage.getEntityList(); + final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() ); + + + for ( final Entity entity : entityList ) { + resultsEntities.add( mapEntity( entity ) ); + } + + final Results results = Results.fromEntities( resultsEntities ); + + return results; + } + + + /** + * + * @param cpEntity + * @return + */ + private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) { + + + final Id entityId = cpEntity.getId(); + + org.apache.usergrid.persistence.Entity entity = + EntityFactory.newEntity( entityId.getUuid(), entityId.getType() ); + + Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); + entity.addProperties( entityMap ); + + return entity; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java index c779bb7..ff44416 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java @@ -20,9 +20,7 @@ package org.apache.usergrid.corepersistence.results; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -42,53 +40,42 @@ import rx.Observable; * Our proxy to allow us to subscribe to observable results, then return them as an iterator. A bridge for 2.0 -> 1.0 * code. This should not be used on any new code, and will eventually be deleted */ -public class ObservableQueryExecutor implements QueryExecutor { +@Deprecated//Required for 1.0 compatibility +public abstract class ObservableQueryExecutor<T> implements QueryExecutor { private final Observable<Results> resultsObservable; public Iterator<Results> iterator; - public ObservableQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable) { - //map to our old results objects, return a default empty if required - this.resultsObservable = resultsObservable.map( resultsPage -> createResults( resultsPage ) ).defaultIfEmpty( new Results() ); + public ObservableQueryExecutor( final Observable<ResultsPage<T>> resultsObservable ) { + //map to our old results objects, return a default empty if required + this.resultsObservable = resultsObservable.map( resultsPage -> createResultsInternal( resultsPage ) ) + .defaultIfEmpty( new Results() ); } /** - * - * @param cpEntity + * Transform the results + * @param resultsPage * @return */ - private org.apache.usergrid.persistence.Entity mapEntity( final Entity cpEntity ) { + protected abstract Results createResults( final ResultsPage resultsPage ); - final Id entityId = cpEntity.getId(); - org.apache.usergrid.persistence.Entity entity = - EntityFactory.newEntity( entityId.getUuid(), entityId.getType() ); - - Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); - entity.addProperties( entityMap ); - - return entity; - } - - private Results createResults( final ResultsPage resultsPage ){ - - final List<Entity> entityList = resultsPage.getEntityList(); - final List<org.apache.usergrid.persistence.Entity> resultsEntities = new ArrayList<>( entityList.size() ); - - - for(final Entity entity: entityList){ - resultsEntities.add( mapEntity( entity ) ); - } + /** + * Legacy to transform our results page to a new results + * @param resultsPage + * @return + */ + private Results createResultsInternal( final ResultsPage resultsPage ) { - final Results results = Results.fromEntities( resultsEntities ); + final Results results = createResults( resultsPage ); //add the cursor if our limit is the same - if(resultsPage.hasMoreResults()) { + if ( resultsPage.hasMoreResults() ) { final Optional<String> cursor = resultsPage.getResponseCursor().encodeAsString(); if ( cursor.isPresent() ) { @@ -96,11 +83,10 @@ public class ObservableQueryExecutor implements QueryExecutor { } } return results; - - } + @Override public Iterator<Results> iterator() { return this; @@ -130,6 +116,4 @@ public class ObservableQueryExecutor implements QueryExecutor { return next; } - - } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa9153ac/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java index 3afb77f..6bdf162 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java @@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.Results; * * QueryExecutor.next() should always return a non-null Results object */ +@Deprecated//Required for 1.0 compatibility public interface QueryExecutor extends Iterable<Results>, Iterator<Results> {
