Finishes testing of connections
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b59abac3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b59abac3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b59abac3 Branch: refs/heads/two-dot-o-dev Commit: b59abac30fbdb001ef2d2bfbc4e7952e226ebc51 Parents: aa9153a Author: Todd Nine <tn...@apigee.com> Authored: Thu May 21 17:28:04 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Thu May 21 17:28:04 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpRelationManager.java | 32 ++++------ .../pipeline/builder/IdBuilder.java | 5 ++ .../pipeline/read/collect/IdResumeFilter.java | 61 ++++++++++++++++++++ .../results/ConnectionRefQueryExecutor.java | 2 +- .../results/EntityQueryExecutor.java | 2 +- .../results/ObservableQueryExecutor.java | 4 +- 6 files changed, 81 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index be143ce..c766a1b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -37,7 +37,6 @@ import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFacto import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor; import org.apache.usergrid.corepersistence.results.EntityQueryExecutor; -import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.ConnectedEntityRef; @@ -268,7 +267,7 @@ public class CpRelationManager implements RelationManager { GraphManager gm = managerCache.getGraphManager( applicationScope ); Observable<Edge> edges = gm.loadEdgeVersions( new SimpleSearchByEdge( new SimpleId( headEntity.getUuid(), headEntity.getType() ), edgeType, entityId, - Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ) ); + Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) ); return edges.toBlocking().firstOrDefault( null ) != null; } @@ -933,37 +932,28 @@ public class CpRelationManager implements RelationManager { cpHeadEntity.getId() ); - - - if(query.getResultsLevel() == Level.REFS){ + if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) { final IdBuilder traversedIds; - if(query.isGraphSearch()){ - - traversedIds = pipelineBuilder.traverseConnection( connection, entityType ); - + if ( query.isGraphSearch() ) { + traversedIds = pipelineBuilder.traverseConnection( connection, entityType ); } - else - { - traversedIds = pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds(); - + else { + traversedIds = + pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds(); } - final Observable<ResultsPage<ConnectionRef>> results = traversedIds.loadConnectionRefs( - cpHeadEntity.getId(), connection ).build(); + //create connection refs - return new ConnectionRefQueryExecutor( results ).next(); + final Observable<ResultsPage<ConnectionRef>> results = + traversedIds.loadConnectionRefs( cpHeadEntity.getId(), connection ).build(); + return new ConnectionRefQueryExecutor( results ).next(); } - if(query.getResultsLevel() == Level.IDS){ - - throw new UnsupportedOperationException( "Not yet implemented" ); - } - //we want to load all entities http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java index 4291ea9..0f784a6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java @@ -24,8 +24,11 @@ import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; import org.apache.usergrid.corepersistence.pipeline.Pipeline; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter; import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter; +import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter; +import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector; import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate; import org.apache.usergrid.persistence.ConnectionRef; import org.apache.usergrid.persistence.model.entity.Entity; @@ -33,6 +36,8 @@ import org.apache.usergrid.persistence.model.entity.Id; import com.google.common.base.Optional; +import rx.Observable; + /** * A builder to transition from emitting Ids in the pipeline into other operations http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java new file mode 100644 index 0000000..e9fd8de --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.collect; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if + * it matches the Id that was set + */ +public class IdResumeFilter extends AbstractPathFilter<Id, Id, Id> { + + + @Override + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) { + + //filter only the first id, then map into our path for our next pass + + + //skip our first and emit if neccessary + return filterResultObservable.skipWhile( filterResult -> { + + final Optional<Id> startFromCursor = getSeekValue(); + + return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue() ); + } ); + } + + + @Override + protected CursorSerializer<Id> getCursorSerializer() { + return IdCursorSerializer.INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java index 798c9c7..3dfd98a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java @@ -36,7 +36,7 @@ import rx.Observable; /** - * Processes our results of entities and turns them into + * Processes our results of connection refs */ @Deprecated//Required for 1.0 compatibility public class ConnectionRefQueryExecutor extends ObservableQueryExecutor<ConnectionRef> { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java index bc9001e..0e18e31 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java @@ -35,7 +35,7 @@ import rx.Observable; /** - * Processes our results of entities and turns them into + * Processes our results of entities */ @Deprecated//Required for 1.0 compatibility public class EntityQueryExecutor extends ObservableQueryExecutor<Entity> { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java index ff44416..fce1fb2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java @@ -60,7 +60,7 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor { * @param resultsPage * @return */ - protected abstract Results createResults( final ResultsPage resultsPage ); + protected abstract Results createResults( final ResultsPage<T> resultsPage ); @@ -69,7 +69,7 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor { * @param resultsPage * @return */ - private Results createResultsInternal( final ResultsPage resultsPage ) { + private Results createResultsInternal( final ResultsPage<T> resultsPage ) { final Results results = createResults( resultsPage );