Finishes testing of connections

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b59abac3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b59abac3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b59abac3

Branch: refs/heads/two-dot-o-dev
Commit: b59abac30fbdb001ef2d2bfbc4e7952e226ebc51
Parents: aa9153a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 21 17:28:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 21 17:28:04 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 32 ++++------
 .../pipeline/builder/IdBuilder.java             |  5 ++
 .../pipeline/read/collect/IdResumeFilter.java   | 61 ++++++++++++++++++++
 .../results/ConnectionRefQueryExecutor.java     |  2 +-
 .../results/EntityQueryExecutor.java            |  2 +-
 .../results/ObservableQueryExecutor.java        |  4 +-
 6 files changed, 81 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index be143ce..c766a1b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -37,7 +37,6 @@ import 
org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFacto
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
 import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
-import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.ConnectedEntityRef;
@@ -268,7 +267,7 @@ public class CpRelationManager implements RelationManager {
         GraphManager gm = managerCache.getGraphManager( applicationScope );
         Observable<Edge> edges = gm.loadEdgeVersions(
             new SimpleSearchByEdge( new SimpleId( headEntity.getUuid(), 
headEntity.getType() ), edgeType, entityId,
-                Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ) );
+                Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, 
Optional.absent() ) );
 
         return edges.toBlocking().firstOrDefault( null ) != null;
     }
@@ -933,37 +932,28 @@ public class CpRelationManager implements RelationManager 
{
             cpHeadEntity.getId() );
 
 
-
-
-        if(query.getResultsLevel() == Level.REFS){
+        if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() 
== Level.IDS ) {
 
             final IdBuilder traversedIds;
-            if(query.isGraphSearch()){
-
-               traversedIds = pipelineBuilder.traverseConnection( connection, 
entityType );
-
 
+            if ( query.isGraphSearch() ) {
+                traversedIds = pipelineBuilder.traverseConnection( connection, 
entityType );
             }
-            else
-            {
-                traversedIds = pipelineBuilder.searchConnection( connection, 
query.getQl().get(), entityType ).loadIds();
-
+            else {
+                traversedIds =
+                    pipelineBuilder.searchConnection( connection, 
query.getQl().get(), entityType ).loadIds();
             }
 
-            final Observable<ResultsPage<ConnectionRef>> results = 
traversedIds.loadConnectionRefs(
-                cpHeadEntity.getId(), connection ).build();
+            //create connection refs
 
-            return new ConnectionRefQueryExecutor( results ).next();
+            final Observable<ResultsPage<ConnectionRef>> results =
+                traversedIds.loadConnectionRefs( cpHeadEntity.getId(), 
connection ).build();
 
+            return new ConnectionRefQueryExecutor( results ).next();
         }
 
 
 
-        if(query.getResultsLevel() == Level.IDS){
-
-            throw new UnsupportedOperationException( "Not yet implemented" );
-        }
-
 
         //we want to load all entities
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 4291ea9..0f784a6 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -24,8 +24,11 @@ import 
org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
 import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import 
org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter;
 import 
org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefResumeFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
 import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
 import org.apache.usergrid.persistence.ConnectionRef;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -33,6 +36,8 @@ import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
 
+import rx.Observable;
+
 
 /**
  * A builder to transition from emitting Ids in the pipeline into other 
operations

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java
new file mode 100644
index 0000000..e9fd8de
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/IdResumeFilter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.collect;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * A filter that is used when we can potentially serialize pages via cursor.  
This will filter the first result, only if
+ * it matches the Id that was set
+ */
+public class IdResumeFilter extends AbstractPathFilter<Id, Id, Id>  {
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final 
Observable<FilterResult<Id>> filterResultObservable ) {
+
+        //filter only the first id, then map into our path for our next pass
+
+
+        //skip our first and emit if neccessary
+        return filterResultObservable.skipWhile( filterResult -> {
+
+            final Optional<Id> startFromCursor = getSeekValue();
+
+            return startFromCursor.isPresent() && 
startFromCursor.get().equals( filterResult.getValue() );
+        } );
+    }
+
+
+    @Override
+    protected CursorSerializer<Id> getCursorSerializer() {
+        return IdCursorSerializer.INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
index 798c9c7..3dfd98a 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
@@ -36,7 +36,7 @@ import rx.Observable;
 
 
 /**
- * Processes our results of entities and turns them into
+ * Processes our results of connection refs
  */
 @Deprecated//Required for 1.0 compatibility
 public class ConnectionRefQueryExecutor extends 
ObservableQueryExecutor<ConnectionRef> {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
index bc9001e..0e18e31 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
@@ -35,7 +35,7 @@ import rx.Observable;
 
 
 /**
- * Processes our results of entities and turns them into
+ * Processes our results of entities
  */
 @Deprecated//Required for 1.0 compatibility
 public class EntityQueryExecutor extends ObservableQueryExecutor<Entity> {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b59abac3/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index ff44416..fce1fb2 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -60,7 +60,7 @@ public abstract class ObservableQueryExecutor<T> implements 
QueryExecutor {
      * @param resultsPage
      * @return
      */
-    protected abstract Results createResults( final ResultsPage resultsPage );
+    protected abstract Results createResults( final ResultsPage<T> resultsPage 
);
 
 
 
@@ -69,7 +69,7 @@ public abstract class ObservableQueryExecutor<T> implements 
QueryExecutor {
      * @param resultsPage
      * @return
      */
-    private Results createResultsInternal( final ResultsPage resultsPage ) {
+    private Results createResultsInternal( final ResultsPage<T> resultsPage ) {
 
 
         final Results results = createResults( resultsPage );

Reply via email to