WIP squash

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b144cc2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b144cc2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b144cc2b

Branch: refs/heads/USERGRID-593
Commit: b144cc2bc6f77d35286bad3bcea8159f17cc4fc5
Parents: 9f6fa27
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Apr 23 19:07:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Apr 23 19:07:34 2015 -0600

----------------------------------------------------------------------
 stack/core/pom.xml                              |  12 --
 .../corepersistence/CpEntityManagerFactory.java |   2 +-
 .../corepersistence/CpRelationManager.java      |   9 +-
 .../usergrid/corepersistence/CpWalker.java      |   4 +-
 .../corepersistence/command/CommandBuilder.java |  60 ++++++-
 .../command/cursor/CursorCache.java             |  37 -----
 .../command/cursor/CursorSerializer.java        |  86 ++++++++++
 .../command/cursor/RequestCursor.java           |  45 ++++++
 .../command/cursor/ResponseCursor.java          |  56 +++++++
 .../command/read/AbstractCommand.java           | 111 +++++++++++++
 .../command/read/CollectCommand.java            |   6 +
 .../corepersistence/command/read/Command.java   |  28 +++-
 .../command/read/entity/EntityLoadCommand.java  | 158 +++++++++++++++++++
 .../read/graph/AbstractReadGraphCommand.java    | 101 ++++++++++++
 .../read/graph/ReadGraphCollectionCommand.java  |  53 +++++++
 .../read/graph/ReadGraphConnectionCommand.java  |  49 ++++++
 .../usergrid/corepersistence/cursor/Cursor.java |  42 -----
 .../corepersistence/cursor/CursorBuilder.java   |  28 ----
 .../cursor/CursorSerializer.java                | 112 -------------
 .../corepersistence/graph/GraphCursor.java      |  65 --------
 .../corepersistence/graph/GraphOperations.java  |  66 --------
 .../CollectionResultsLoaderFactoryImpl.java     |   1 -
 .../results/GraphQueryExecutor.java             | 152 ++++++++++++++++++
 .../rx/ApplicationObservable.java               |  42 ++---
 .../rx/EdgesFromSourceObservable.java           |   4 +-
 .../rx/EdgesToTargetObservable.java             |   4 +-
 .../persistence/collection/EntitySet.java       |  14 +-
 .../serialization/impl/EntitySetImpl.java       |  10 +-
 .../persistence/core/rx/OrderedMerge.java       |  28 ++--
 .../graph/impl/SimpleSearchByEdgeType.java      |  26 ++-
 .../impl/stage/NodeDeleteListenerImpl.java      |   5 +-
 .../impl/shard/DirectedEdgeMeta.java            |   7 +-
 .../persistence/graph/GraphManagerLoadTest.java |   6 +-
 .../graph/GraphManagerShardConsistencyIT.java   |   3 +-
 .../graph/GraphManagerStressTest.java           |   8 +-
 stack/corepersistence/pom.xml                   |   2 +-
 .../scenarios/ConnectionScenarios.scala         |   2 +-
 stack/pom.xml                                   |   1 -
 38 files changed, 1005 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index d91208d..f12c0b8 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -473,18 +473,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
-      <version>${rx.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-math</artifactId>
-      <version>${rx.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>com.clearspring.analytics</groupId>
       <artifactId>stream</artifactId>
       <version>2.7.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index fe4d828..b12b6ce 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -427,7 +427,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
 
         Observable<Edge> edges = gm.loadEdgesFromSource( new 
SimpleSearchByEdgeType(
                 fromEntityId, edgeType, Long.MAX_VALUE,
-                SearchByEdgeType.Order.DESCENDING, null ));
+                SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ));
 
         Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
         while ( iter.hasNext() ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index da39ea9..2ee136e 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -106,6 +106,7 @@ import org.apache.usergrid.utils.MapUtils;
 import org.apache.usergrid.utils.UUIDUtils;
 
 import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 import me.prettyprint.hector.api.Keyspace;
@@ -331,7 +332,7 @@ public class CpRelationManager implements RelationManager {
                   public Observable<Edge> call( final String edgeType ) {
                       return gm.loadEdgesToTarget(
                           new SimpleSearchByEdgeType( cpHeadEntity.getId(), 
edgeType, Long.MAX_VALUE,
-                              SearchByEdgeType.Order.DESCENDING, null ) );
+                              SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent() ) );
 
                   }
               } );
@@ -400,7 +401,7 @@ public class CpRelationManager implements RelationManager {
                     public Observable<Edge> call( final String etype ) {
                         return gm.loadEdgesToTarget( new 
SimpleSearchByEdgeType(
                             cpHeadEntity.getId(), etype, Long.MAX_VALUE,
-                            SearchByEdgeType.Order.DESCENDING, null ) );
+                            SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent() ) );
                     }
                 } )
 
@@ -516,7 +517,7 @@ public class CpRelationManager implements RelationManager {
             CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
             System.currentTimeMillis(),
             SearchByEdgeType.Order.DESCENDING,
-            null ) ); // last
+            Optional.<Edge>absent() ) ); // last
 
         Iterator<Edge> iterator = 
edgesToTarget.toBlockingObservable().getIterator();
         int count = 0;
@@ -541,7 +542,7 @@ public class CpRelationManager implements RelationManager {
             CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
             System.currentTimeMillis(),
             SearchByEdgeType.Order.DESCENDING,
-            null ) ); // last
+            Optional.<Edge>absent() ) ); // last
 
         int count = edgesFromSource.take( 2 ).count().toBlocking().last();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index 4b902d8..b2354a6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -31,6 +31,8 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
@@ -111,7 +113,7 @@ public class CpWalker {
                 logger.debug( "Loading edges of type {} from node {}", 
edgeType, applicationId );
 
                 return gm.loadEdgesFromSource(  new SimpleSearchByEdgeType(
-                    applicationId, edgeType, Long.MAX_VALUE, order , null ) );
+                    applicationId, edgeType, Long.MAX_VALUE, order ,  
Optional.<Edge>absent() ) );
 
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
index f40c1d5..1d66d3a 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
@@ -20,9 +20,17 @@
 package org.apache.usergrid.corepersistence.command;
 
 
+import org.apache.usergrid.corepersistence.command.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor;
 import org.apache.usergrid.corepersistence.command.read.CollectCommand;
+import org.apache.usergrid.corepersistence.command.read.Command;
 import org.apache.usergrid.corepersistence.command.read.TraverseCommand;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
 
 
 /**
@@ -32,24 +40,66 @@ public class CommandBuilder {
 
 
     private final ApplicationScope applicationScope;
+    private final RequestCursor requestCursor;
+    private final ResponseCursor responseCursor;
+
+    private int count = 0;
+
+    private Observable<Id> currentObservable;
+
+
+       /**
+     * Our first pass, where we implement our start point as an Id until we 
can use this to perform our entire
+     * traversal.  Eventually as we untangle the existing Query service 
nightmare, the sourceId will be remove and should
+     * only be traversed from the root application
+     */
+    public CommandBuilder(final ApplicationScope applicationScope,  final Id 
sourceId, final Optional<String> requestCursor ) {
+
+        this.applicationScope = applicationScope;
+
+        //set the request cursor
+        this.requestCursor = new RequestCursor( requestCursor );
+
+        //set the response cursor
+        this.responseCursor = new ResponseCursor();
 
 
-    public CommandBuilder( final ApplicationScope applicationScope ) 
{this.applicationScope = applicationScope;}
+        this.currentObservable = Observable.just( sourceId );
+    }
 
 
     /**
      * Add a read command that will read Ids and produce Ids.  This is an 
intermediate traversal operations
-     * @param traverseCommand
-     * @return
      */
-    public CommandBuilder withTraverseCommand(final TraverseCommand 
traverseCommand ){
+    public CommandBuilder withTraverseCommand( final TraverseCommand 
traverseCommand ) {
+
+        setState( traverseCommand );
+
+        this.currentObservable = currentObservable.compose( traverseCommand );
 
         return this;
     }
 
-    public <T>  T build(final CollectCommand<T> collectCommand ){
 
+    /**
+     * Build the final collection step, and
+     */
+    public <T> Observable<T> build( final CollectCommand<T> collectCommand ) {
+        setState( collectCommand );
+
+        return currentObservable.compose( collectCommand );
     }
 
 
+    /**
+     * Set the id of the state
+     * @param command
+     */
+    private void setState( Command<?> command ) {
+        command.setId( count );
+        //done for clarity
+        count++;
+
+        command.setCursorCaches( requestCursor, responseCursor );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorCache.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorCache.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorCache.java
deleted file mode 100644
index 4080958..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorCache.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.command.cursor;
-
-
-import org.apache.usergrid.corepersistence.cursor.Cursor;
-
-
-public class CursorCache {
-
-
-    /**
-     * Get the cursor with the specified id
-     * @param id
-     * @return
-     */
-    public Cursor getCursor(final String id){
-       return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java
new file mode 100644
index 0000000..b45e7da
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.cursor;
+
+
+import java.io.Serializable;
+
+import com.fasterxml.jackson.core.Base64Variant;
+import com.fasterxml.jackson.core.Base64Variants;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+
+
+/**
+ * A utility to serialize objects to/from cursors
+ */
+public class CursorSerializer {
+
+
+    private static final SmileFactory SMILE_FACTORY = new SmileFactory();
+
+    private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY 
);
+
+    private static final Base64Variant VARIANT = 
Base64Variants.MODIFIED_FOR_URL;
+
+
+    /**
+     * Serialize the serializable object as a cursor
+     */
+    public static String asCursor( final Serializable cursor ) {
+
+        try {
+            return MAPPER.writer( VARIANT ).writeValueAsString( cursor );
+        }
+        catch ( JsonProcessingException e ) {
+            throw new CursorParseException( "Unable to serialize cursor", e );
+        }
+    }
+
+
+    /**
+     * Deserialize from the cursor
+     * @param cursor
+     * @return
+     * @throws CursorParseException
+     */
+    public <T extends Serializable> T fromCursor( final String cursor, final 
Class<T> cursorClass ) throws CursorParseException {
+        try {
+
+            final JsonParser parser = MAPPER.getFactory().createParser( cursor 
);
+            return MAPPER.reader( VARIANT ).readValue( parser, cursorClass);
+        }
+        catch ( Exception e ) {
+            throw new CursorParseException( "Unable to serialize cursor", e );
+        }
+    }
+
+
+    /**
+     * Thrown when we can't parse a cursor
+     */
+    public static class CursorParseException extends RuntimeException {
+        public CursorParseException( final String message, final Throwable 
cause ) {
+            super( message, cause );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java
new file mode 100644
index 0000000..60d54ff
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.cursor;
+
+
+import java.io.Serializable;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A cursor that has been passed in with our request.  Adds utils for parsing 
values
+ */
+public class RequestCursor {
+
+    public RequestCursor(final Optional<String> cursor){
+
+    }
+
+
+    /**
+     * Get the cursor with the specified id
+     */
+    public <T extends Serializable> T getCursor( final int id, final Class<T> 
cursorType ) {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java
new file mode 100644
index 0000000..02aae34
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.cursor;
+
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A cursor used in rendering a response
+ */
+public class ResponseCursor {
+
+
+
+    /**
+     * We use a map b/c some indexes might be skipped
+     */
+    private Map<Integer, ? super Serializable> cursors = new HashMap<>();
+
+    /**
+     * Set the possible cursor value into the index. DOES NOT parse the 
cursor.  This is intentional for performance
+     */
+    public <T extends Serializable> void setCursor( final int id, final T 
cursor ) {
+        cursors.put( id, cursor );
+    }
+
+
+    private void ensureCapacity() {
+
+    }
+
+
+    public String encodeAsString() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java
new file mode 100644
index 0000000..59e1848
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read;
+
+
+import java.io.Serializable;
+
+import javax.xml.ws.Response;
+
+import org.apache.usergrid.corepersistence.command.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Basic functionality for our commands to handle cursor IO
+ */
+public abstract class AbstractCommand<T, C extends Serializable> implements 
Command<T> {
+
+    private int id;
+    /**
+     * The cache of the cursor that was set when the read was started
+     */
+    private RequestCursor readCache;
+
+    /**
+     * The current state of the write cache.  Gets updated as we traverse the 
observables
+     */
+    private ResponseCursor writeCache;
+
+
+    /**
+     * The applicationScope
+     */
+    protected ApplicationScope applicationScope;
+
+
+    @Override
+    public void setId( final int id ) {
+        this.id = id;
+    }
+
+
+    @Override
+    public void setCursorCaches( final RequestCursor readCache, final 
ResponseCursor writeCache ) {
+        this.readCache = readCache;
+        this.writeCache = writeCache;
+    }
+
+
+    @Override
+    public void setApplicationScope( final ApplicationScope applicationScope ) 
{
+       this.applicationScope = applicationScope;
+    }
+
+
+    /**
+     * Return the parsed value of the cursor from the last request, if it 
exists
+     */
+    protected Optional<C> getCursor() {
+        final C cursor = readCache.getCursor( id, getCursorClass() );
+
+        return Optional.fromNullable( cursor );
+    }
+
+
+
+
+
+    /**
+     * Set the cursor value into the new cursor write cache
+     * @param newValue
+     */
+    protected void setCursor(final C newValue){
+        writeCache.setCursor( id, newValue );
+    }
+
+
+    /**
+     * Generate our state as a cursor
+     * @return
+     */
+    protected String generateCursor(){
+        return writeCache.encodeAsString();
+    }
+
+    /**
+     * Return the class to be used when parsing the cursor
+     */
+    protected abstract Class<C> getCursorClass();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
index 923cef9..5e982b6 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
@@ -26,5 +26,11 @@ package org.apache.usergrid.corepersistence.command.read;
  */
 public interface CollectCommand<T> extends Command<T>{
 
+    /**
+     * Set the prefered result size for the command
+     * @param resultSize
+     */
+    void setResultSize(final int resultSize);
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java
index 1b8110d..ada47d2 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java
@@ -20,25 +20,37 @@
 package org.apache.usergrid.corepersistence.command.read;
 
 
-import org.apache.commons.collections4.Transformer;
-
-import org.apache.usergrid.corepersistence.command.cursor.CursorCache;
-import org.apache.usergrid.corepersistence.cursor.Cursor;
+import org.apache.usergrid.corepersistence.command.cursor.RequestCursor;
+import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import rx.Observable;
+
 
 /**
- * Interface for a read command.  This takes an input of Id, performs some 
operation, and emits
- * Id for further processing
+ * Interface for a read command.  This takes an input of Id, performs some 
operation, and emits Id for further
+ * processing
  */
-public interface Command<T> extends Transformer<Id, T> {
+public interface Command<T> extends Observable.Transformer<Id, T> {
 
 
     /**
+     * Set the id of this command in it's execution environment
+     */
+    void setId( final int id );
+
+    /**
      * Set the cursor cache into the command
+     *
      * @param readCache Set the cache that was used in the request
      * @param writeCache Set the cache to be used when writing the results
      */
-    void setCursorCaches( final CursorCache readCache, final CursorCache 
writeCache );
+    void setCursorCaches( final RequestCursor readCache, final ResponseCursor 
writeCache );
 
+    /**
+     * Set the application scope of the command
+     * @param applicationScope
+     */
+    void setApplicationScope(final ApplicationScope applicationScope);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
new file mode 100644
index 0000000..3b6cade
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.entity;
+
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.usergrid.corepersistence.command.read.AbstractCommand;
+import org.apache.usergrid.corepersistence.command.read.CollectCommand;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+
+/**
+ * Loads entities from a set of Ids.
+ *
+ * TODO refactor this into a common command that both ES search and 
graphSearch can use for repair and verification
+ */
+public class EntityLoadCommand extends AbstractCommand<Results, Serializable> 
implements CollectCommand<Results> {
+
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+
+    //TODO get rid of this when merged into 2.0 dev
+    private final CollectionScope collectionScope;
+    private int resultSize;
+
+
+    public EntityLoadCommand( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                              final CollectionScope collectionScope ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.collectionScope = collectionScope;
+    }
+
+
+    @Override
+    protected Class<Serializable> getCursorClass() {
+        return null;
+    }
+
+
+    @Override
+    public Observable<Results> call( final Observable<? extends Id> observable 
) {
+
+        final EntityCollectionManager ecm =
+            entityCollectionManagerFactory.createCollectionManager( 
this.collectionScope );
+
+        return observable.buffer( resultSize ).flatMap( new Func1<List<? 
extends Id>, Observable<Results>>() {
+            @Override
+            public Observable<Results> call( final List<? extends Id> ids ) {
+
+                //load the entities
+                final Observable<EntitySet> entities = ecm.load( ( 
Collection<Id> ) ids );
+
+
+                return entities.flatMap( new Func1<EntitySet, 
Observable<Results>>() {
+                    @Override
+                    public Observable<Results> call( final EntitySet entitySet 
) {
+                        return createResults( entitySet );
+                    }
+                } );
+            }
+        } );
+    }
+
+
+    /**
+     * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up 
our lower levels and create new results
+     * objects
+     */
+    public Observable<Results> createResults( final EntitySet entitySet ) {
+
+
+        return Observable.from( entitySet.getEntities() ).map(
+            new Func1<MvccEntity, org.apache.usergrid.persistence.Entity>() {
+
+                @Override
+                public org.apache.usergrid.persistence.Entity call( final 
MvccEntity mvccEntity ) {
+                    return mapEntity( mvccEntity );
+                }
+            } )
+            //filter null entities
+            .filter( new Func1<org.apache.usergrid.persistence.Entity, 
Boolean>() {
+                @Override
+                public Boolean call( final 
org.apache.usergrid.persistence.Entity entity ) {
+                    return entity == null;
+                }
+            } )
+            //buffer them and put them in as a map
+            .toList().map( new 
Func1<List<org.apache.usergrid.persistence.Entity>, Results>() {
+                @Override
+                public Results call( final 
List<org.apache.usergrid.persistence.Entity> entities ) {
+                    final Results results = Results.fromEntities( entities );
+                    results.setCursor( generateCursor() );
+
+                    return results;
+                }
+            } );
+    }
+
+
+    /**
+     * Map a new cp entity to an old entity.  May be null if not present
+     */
+    private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity 
mvccEntity ) {
+        if ( !mvccEntity.getEntity().isPresent() ) {
+            return null;
+        }
+
+
+        final Entity cpEntity = mvccEntity.getEntity().get();
+        final Id entityId = cpEntity.getId();
+
+        org.apache.usergrid.persistence.Entity entity =
+            EntityFactory.newEntity( entityId.getUuid(), entityId.getType() );
+
+        Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
+        entity.addProperties( entityMap );
+
+        return entity;
+    }
+
+
+    @Override
+    public void setResultSize( final int resultSize ) {
+        this.resultSize = resultSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java
new file mode 100644
index 0000000..f637510
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.graph;
+
+
+import org.apache.usergrid.corepersistence.command.read.AbstractCommand;
+import org.apache.usergrid.corepersistence.command.read.TraverseCommand;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName;
+
+
+/**
+ * Command for reading graph edges
+ */
+public abstract class AbstractReadGraphCommand extends AbstractCommand<Id, 
Edge> implements TraverseCommand {
+
+    private final GraphManagerFactory graphManagerFactory;
+
+
+    /**
+     * Create a new instance of our command
+     * @param graphManagerFactory
+     */
+    public AbstractReadGraphCommand( final GraphManagerFactory 
graphManagerFactory ) {
+        this.graphManagerFactory = graphManagerFactory;
+    }
+
+
+    @Override
+    public Observable<Id> call( final Observable<? extends Id> observable ) {
+
+        //get the graph manager
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+
+        //set our our constant state
+        final Optional<Edge> startFromCursor = getCursor();
+
+        final String edgeName = getEdgeTypeName();
+
+
+        //return all ids that are emitted from this edge
+        return observable.flatMap( new Func1<Id, Observable<Id>>() {
+
+            @Override
+            public Observable<Id> call( final Id id ) {
+
+                final SimpleSearchByEdgeType search = new 
SimpleSearchByEdgeType(id,edgeName, Long.MAX_VALUE,
+                    SearchByEdgeType.Order.DESCENDING, startFromCursor   );
+
+                return graphManager.loadEdgesFromSource( search ).map( new 
Func1<Edge, Id>() {
+                    @Override
+                    public Id call( final Edge edge ) {
+                        return edge.getTargetNode();
+                    }
+                } );
+            }
+        } );
+    }
+
+
+    @Override
+    protected Class<Edge> getCursorClass() {
+        return Edge.class;
+    }
+
+
+
+    /**
+     * Get the edge type name we should use when traversing
+     * @return
+     */
+    protected abstract String getEdgeTypeName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
new file mode 100644
index 0000000..aec0d8b
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.graph;
+
+
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName;
+
+
+/**
+ * Command for reading graph edges on a collection
+ */
+public class ReadGraphCollectionCommand extends AbstractReadGraphCommand{
+
+    private final String collectionName;
+
+
+    /**
+     * Create a new instance of our command
+     * @param graphManagerFactory
+     * @param collectionName
+     */
+    public ReadGraphCollectionCommand( final GraphManagerFactory 
graphManagerFactory, final String collectionName ) {
+       super(graphManagerFactory);
+        this.collectionName = collectionName;
+    }
+
+
+
+
+    @Override
+    protected String getEdgeTypeName() {
+        return  getCollectionScopeNameFromCollectionName(collectionName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
new file mode 100644
index 0000000..adebd45
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.graph;
+
+
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName;
+
+
+/**
+ * Command for reading graph edges on a connection
+ */
+public class ReadGraphConnectionCommand extends AbstractReadGraphCommand {
+
+    private final String connectionName;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    public ReadGraphConnectionCommand( final GraphManagerFactory 
graphManagerFactory, final String connectionName ) {
+        super( graphManagerFactory );
+        this.connectionName = connectionName;
+    }
+
+
+    @Override
+    protected String getEdgeTypeName() {
+        return getConnectionScopeName( connectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/Cursor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/Cursor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/Cursor.java
deleted file mode 100644
index 4ec8d81..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/Cursor.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.cursor;
-
-
-import java.io.Serializable;
-
-
-/**
- * Interface for cursors
- */
-public interface Cursor<T extends Serializable> {
-
-    /**
-     * Get the type of the cursor
-     * @return
-     */
-    String getId();
-
-    /**
-     * Return the cursor to be used for serialization
-     * @return
-     */
-    T getCursor();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorBuilder.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorBuilder.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorBuilder.java
deleted file mode 100644
index 26ca5a3..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorBuilder.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.cursor;
-
-
-public class CursorBuilder {
-
-    public static <T> T getCursor(final String cursor, Class<T> cursorClass){
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorSerializer.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorSerializer.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorSerializer.java
deleted file mode 100644
index 0cf9fa0..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/cursor/CursorSerializer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.cursor;
-
-
-import java.io.Serializable;
-
-import com.fasterxml.jackson.core.Base64Variant;
-import com.fasterxml.jackson.core.Base64Variants;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-
-
-/**
- * A utility to serialize objects to/from cursors
- */
-public class CursorSerializer {
-
-
-    private static final SmileFactory SMILE_FACTORY = new SmileFactory();
-
-    private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY 
);
-
-    private static final Base64Variant VARIANT = 
Base64Variants.MODIFIED_FOR_URL;
-
-
-    /**
-     * Serialize the serializable object as a cursor
-     */
-    public static String asCursor( final Cursor<?> cursor ) {
-
-        final SerializableCursor serializableCursor = new SerializableCursor( 
cursor.getType(), cursor.getCursor() );
-
-        try {
-            return MAPPER.writer( VARIANT ).writeValueAsString( 
serializableCursor );
-        }
-        catch ( JsonProcessingException e ) {
-            throw new CursorParseException( "Unable to serialize cursor", e );
-        }
-    }
-
-
-    /**
-     * Deserialize from the cursor
-     * @param cursor
-     * @return
-     * @throws CursorParseException
-     */
-    public SerializableCursor fromCursor( final String cursor ) throws 
CursorParseException {
-        try {
-
-            final JsonParser parser = MAPPER.getFactory().createParser( cursor 
);
-            return MAPPER.reader( VARIANT ).readValue( parser, 
SerializableCursor.class );
-        }
-        catch ( Exception e ) {
-            throw new CursorParseException( "Unable to serialize cursor", e );
-        }
-    }
-
-
-    /**
-     * The cursor as a serialized value
-     */
-    public static class SerializableCursor{
-        private final String type;
-
-        private final Serializable value;
-
-        public SerializableCursor( final String type, final Serializable value 
) {
-            this.type = type;
-            this.value = value;
-        }
-
-
-        public String getType() {
-            return type;
-        }
-
-
-        public Serializable getValue() {
-            return value;
-        }
-    }
-
-    /**
-     * Thrown when we can't parse a cursor
-     */
-    public static class CursorParseException extends RuntimeException {
-        public CursorParseException( final String message, final Throwable 
cause ) {
-            super( message, cause );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphCursor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphCursor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphCursor.java
deleted file mode 100644
index 20d55ba..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphCursor.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.graph;
-
-
-import org.apache.usergrid.persistence.graph.Edge;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Our cursor implementation for graph
- */
-public class GraphCursor {
-
-    private Optional<Edge> lastEdge;
-
-
-    /**
-     * Create the graph from the cursor.  This operation is null save, the 
cursor can be null for undefined
-     * @param cursor
-     */
-    public GraphCursor(final Optional<String> cursor){
-        if(!cursor.isPresent()){
-            lastEdge = Optional.absent();
-            return;
-        }
-
-
-
-    }
-
-
-
-    private String asCursorString(){
-
-        final String
-            return null;
-    }
-
-    private void parseCursorString(final String cursor){
-
-    }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphOperations.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphOperations.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphOperations.java
deleted file mode 100644
index 598b6bb..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/graph/GraphOperations.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.graph;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-
-
-/**
- * Operations we perform on a graph
- */
-public interface GraphOperations {
-
-
-    /**
-     * Write a collection edge and return the observable of the operation
-     */
-    Observable<Edge> writeCollectionEdge( final ApplicationScope 
applicationScope, final String collectionName,
-                                          final Id entityId );
-
-    /**
-     * Write the connection edge from the source to the target
-     */
-    Observable<Edge> writeConnectionEdge( final ApplicationScope 
applicationScope, final String connectionEdge,
-                                          final Id sourceId, final Id entityId 
);
-
-
-    /**
-     * Read all collection edges in the order specified for the collection 
name and the application scope
-     */
-    Observable<Edge> readCollectionEdges( final ApplicationScope 
applicationScope, final String collectionName,
-                                          final Order order, final GraphCursor 
cursor );
-
-    /**
-     * Read the
-     */
-    Observable<Edge> readConnectionEdges( final ApplicationScope 
applicationScope, final String connectionName,
-                                          final Id sourceId, final Order 
order, final GraphCursor graphCursor );
-
-
-    enum Order {
-        ASCENDING,
-        DESCENDING;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
index b79700b..4b43142 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
@@ -48,7 +48,6 @@ public class CollectionResultsLoaderFactoryImpl implements 
ResultsLoaderFactory
             verifier = new CollectionRefsVerifier();
         }
         else if ( resultsLevel == Query.Level.IDS ) {
-//            verifier = new RefsVerifier();
             verifier = new IdsVerifier();
         }
         else {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/GraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/GraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/GraphQueryExecutor.java
new file mode 100644
index 0000000..df1a57a
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/GraphQueryExecutor.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.command.CommandBuilder;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * This class is a nasty hack to bridge 2.0 observables into 1.0 iterators
+ * DO NOT use this as a model for moving forward, pandas will die.
+ */
+public class GraphQueryExecutor implements QueryExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger( 
GraphQueryExecutor.class );
+
+
+    private final ApplicationScope applicationScope;
+
+    private final Id sourceId;
+
+    private final String connectionName;
+
+
+    private Iterator<Results>  observableIterator;
+
+
+    public GraphQueryExecutor(final ApplicationScope appScope, final EntityRef 
source, final String connectionName  ) {
+        this.applicationScope = appScope;
+        this.sourceId =
+
+    }
+
+
+    @Override
+    public Iterator<Results> iterator() {
+        return this;
+    }
+
+
+    private void loadNextPage() {
+
+    }
+
+
+    private void build(){
+        CommandBuilder commandBuilder = new CommandBuilder(  );
+    }
+
+    /**
+     * Build results from a set of candidates, and discard those that 
represent stale indexes.
+     *
+     * @param query Query that was executed
+     * @param crs Candidates to be considered for results
+     */
+    private Results buildResults( final IndexScope indexScope, final Query 
query, final CandidateResults crs ) {
+
+        logger.debug( "buildResults()  from {} candidates", crs.size() );
+
+        //get an instance of our results loader
+        final ResultsLoader resultsLoader =
+            this.resultsLoaderFactory.getLoader( applicationScope, indexScope, 
query.getResultsLevel() );
+
+        //load the results
+        final Results results = resultsLoader.loadResults( crs );
+
+        //signal for post processing
+        resultsLoader.postProcess();
+
+
+        results.setCursor( crs.getCursor() );
+
+        //ugly and tight coupling, but we don't have a choice until we finish 
some refactoring
+        results.setQueryExecutor( this );
+
+        logger.debug( "Returning results size {}", results.size() );
+
+        return results;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+        //we've tried to load and it's empty and we have more to load, load 
the next page
+        if ( currentResults == null ) {
+            //there's nothing left to load, nothing to do
+            if ( !moreToLoad ) {
+                return false;
+            }
+
+            //load the page
+
+            loadNextPage();
+        }
+
+
+        //see if our current results are not null
+        return currentResults != null;
+    }
+
+
+    @Override
+    public Results next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more results present" );
+        }
+
+        final Results toReturn = currentResults;
+
+        currentResults = null;
+
+        return toReturn;
+    }
+
+    @Override
+    public void remove() {
+        throw new RuntimeException("Remove not implemented!!");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
index 6019bca..988cd3b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
@@ -39,6 +39,8 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 import rx.functions.Func1;
 
@@ -89,37 +91,37 @@ public class ApplicationObservable {
         //we have app infos.  For each of these app infos, we have to load the 
application itself
         Observable<Id> appIds = gm.loadEdgesFromSource(
                 new SimpleSearchByEdgeType( rootAppId, edgeType, 
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                        null ) ).flatMap( new Func1<Edge, Observable<Id>>() {
+                        Optional.<Edge>absent() ) ).flatMap( new Func1<Edge, 
Observable<Id>>() {
             @Override
             public Observable<Id> call( final Edge edge ) {
                 //get the app info and load it
                 final Id appInfo = edge.getTargetNode();
 
                 return collectionManager.load( appInfo )
-                        //filter out null entities
-                        .filter( new Func1<Entity, Boolean>() {
-                            @Override
-                            public Boolean call( final Entity entity ) {
-                                if ( entity == null ) {
-                                    logger.warn( "Encountered a null 
application info for id {}", appInfo );
-                                    return false;
-                                }
-
-                                return true;
+                    //filter out null entities
+                    .filter( new Func1<Entity, Boolean>() {
+                        @Override
+                        public Boolean call( final Entity entity ) {
+                            if ( entity == null ) {
+                                logger.warn( "Encountered a null application 
info for id {}", appInfo );
+                                return false;
                             }
-                        } )
-                                //get the id from the entity
-                        .map( new 
Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
 
+                            return true;
+                        }
+                    } )
+                        //get the id from the entity
+                    .map( new 
Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
 
-                            @Override
-                            public Id call( final 
org.apache.usergrid.persistence.model.entity.Entity entity ) {
 
-                                final UUID uuid = ( UUID ) entity.getField( 
"applicationUuid" ).getValue();
+                        @Override
+                        public Id call( final 
org.apache.usergrid.persistence.model.entity.Entity entity ) {
 
-                                return CpNamingUtils.generateApplicationId( 
uuid );
-                            }
-                        } );
+                            final UUID uuid = ( UUID ) entity.getField( 
"applicationUuid" ).getValue();
+
+                            return CpNamingUtils.generateApplicationId( uuid );
+                        }
+                    } );
             }
         } );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
index d3e2ee5..b23886b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
@@ -31,6 +31,8 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 import rx.functions.Func1;
 
@@ -56,7 +58,7 @@ public class EdgesFromSourceObservable {
                 logger.debug( "Loading edges of edgeType {} from {}", 
edgeType, sourceNode );
 
                 return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( 
sourceNode, edgeType, Long.MAX_VALUE,
-                        SearchByEdgeType.Order.DESCENDING, null ) );
+                        SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent()  ) );
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
index c5dc54d..3130a72 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
@@ -31,6 +31,8 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 import rx.functions.Func1;
 
@@ -56,7 +58,7 @@ public class EdgesToTargetObservable {
                 logger.debug( "Loading edges of edgeType {} to {}", edgeType, 
targetNode);
 
                 return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( 
targetNode, edgeType, Long.MAX_VALUE,
-                        SearchByEdgeType.Order.DESCENDING, null ) );
+                        SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent()  ) );
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java
index 35b6a12..ffdfc2a 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java
@@ -18,6 +18,8 @@
  */
 package org.apache.usergrid.persistence.collection;
 
+import java.util.Collection;
+
 import org.apache.usergrid.persistence.model.entity.Id;
 
 
@@ -31,17 +33,23 @@ public interface EntitySet {
      * @param entityId
      * @return
      */
-    public MvccEntity getEntity(Id entityId);
+    MvccEntity getEntity( Id entityId );
+
+    /**
+     * Get all entities in the set
+     * @return
+     */
+    Collection<MvccEntity> getEntities();
 
     /**
      * Get the number of entities in this set
      * @return
      */
-    public int size();
+    int size();
 
     /**
      * Return true if the set is empty
      * @return
      */
-    public boolean isEmpty();
+    boolean isEmpty();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntitySetImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntitySetImpl.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntitySetImpl.java
index 921093b..126f9f3 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntitySetImpl.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntitySetImpl.java
@@ -18,6 +18,7 @@ package 
org.apache.usergrid.persistence.collection.serialization.impl;/*
  */
 
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -32,8 +33,7 @@ public class EntitySetImpl implements EntitySet {
     private final Map<Id, MvccEntity> entities;
 
 
-    public EntitySetImpl(
-                          final int expectedSize ) {
+    public EntitySetImpl(final int expectedSize ) {
         this.entities = new HashMap<>( expectedSize );
     }
 
@@ -48,6 +48,12 @@ public class EntitySetImpl implements EntitySet {
         return entities.get( entityId );
     }
 
+    public Collection<MvccEntity> getEntities(){
+        return entities.values();
+    }
+
+
+
 
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
index cdad0d1..613585e 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
@@ -94,20 +94,20 @@ public final class OrderedMerge<T> implements 
Observable.OnSubscribe<T> {
 
             innerObservers[i] = inner;
         }
-
-        /**
-         * Once we're set up, begin the subscription to sub observables
-         */
-        for ( int i = 0; i < observables.length; i++ ) {
-            //subscribe after setting them up
-            //add our subscription to the composite for future cancellation
-            Subscription subscription = observables[i].subscribe( 
innerObservers[i] );
-
-            csub.add( subscription );
-
-            //add the internal composite subscription
-            outerOperation.add( csub );
-        }
+// TODO, if this merge makes it into 2.0-dev remove this and use 2.0 -dev
+//        /**
+//         * Once we're set up, begin the subscription to sub observables
+//         */
+//        for ( int i = 0; i < observables.length; i++ ) {
+//            //subscribe after setting them up
+//            //add our subscription to the composite for future cancellation
+//            Subscription subscription = observables[i].subscribe( 
innerObservers[i] );
+//
+//            csub.add( subscription );
+//
+//            //add the internal composite subscription
+//            outerOperation.add( csub );
+//        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
index 6bc8b1b..1687162 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
@@ -51,21 +51,39 @@ public class SimpleSearchByEdgeType implements 
SearchByEdgeType{
      * @param order The order order.  Descending is most efficient
      * @param last The value to start seeking from.  Must be >= this value
      * @param order
+     *
+     * //TODO, make last an optional
      */
-    public SimpleSearchByEdgeType( final Id node, final String type, final 
long maxTimestamp, final Order order, final Edge last
-                                   ) {
+    public SimpleSearchByEdgeType( final Id node, final String type, final 
long maxTimestamp, final Order order, final Edge last ) {
+        this(node, type, maxTimestamp, order, Optional.fromNullable(last));
+    }
+
+
+    /**
+     * Create the search modules
+     * @param node The node to search from
+     * @param type The edge type
+     * @param maxTimestamp The maximum timestamp to return
+     * @param order The order order.  Descending is most efficient
+     * @param last The value to start seeking from.  Must be >= this value
+     * @param order
+     *
+     * //TODO, make last an optional
+     */
+    public SimpleSearchByEdgeType( final Id node, final String type, final 
long maxTimestamp, final Order order, final Optional<Edge> last ) {
 
         Preconditions.checkNotNull( order, "order is required");
-        ValidationUtils.verifyIdentity(node);
+        ValidationUtils.verifyIdentity( node );
         ValidationUtils.verifyString( type, "type" );
         GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" );
+        Preconditions.checkNotNull( last, "last is required" );
 
 
         this.node = node;
         this.type = type;
         this.maxTimestamp = maxTimestamp;
         this.order = order;
-        this.last = Optional.fromNullable(last);
+        this.last = last;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index f167f0c..03f1df8 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -160,7 +161,7 @@ public class NodeDeleteListenerImpl implements 
NodeDeleteListener {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesToTarget( 
scope,
-                                        new SimpleSearchByEdgeType( node, 
edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, null ) );
+                                        new SimpleSearchByEdgeType( node, 
edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent() ) );
                             }
                         } );
                     }
@@ -177,7 +178,7 @@ public class NodeDeleteListenerImpl implements 
NodeDeleteListener {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return 
storageSerialization.getEdgesFromSource( scope,
-                                        new SimpleSearchByEdgeType( node, 
edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, null ) );
+                                        new SimpleSearchByEdgeType( node, 
edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent() ) );
                             }
                         } );
                     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
index 6bb467f..6bf6178 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.SearchByIdType;
@@ -39,6 +40,7 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
 import com.netflix.astyanax.MutationBatch;
 
 
@@ -272,7 +274,8 @@ public abstract class DirectedEdgeMeta {
                 final Id sourceId = nodes[0].id;
                 final String edgeType = types[0];
 
-                final SearchByEdgeType search = new SimpleSearchByEdgeType( 
sourceId, edgeType, maxValue, order, null);
+                final SearchByEdgeType search = new SimpleSearchByEdgeType( 
sourceId, edgeType, maxValue, order,
+                    Optional.<Edge>absent());
 
                 return serialization.getEdgesFromSource( edgeColumnFamilies, 
scope, search, shards );
             }
@@ -392,7 +395,7 @@ public abstract class DirectedEdgeMeta {
                 final Id targetId = nodes[0].id;
                 final String edgeType = types[0];
 
-                final SearchByEdgeType search = new SimpleSearchByEdgeType( 
targetId, edgeType, maxValue, order, null);
+                final SearchByEdgeType search = new SimpleSearchByEdgeType( 
targetId, edgeType, maxValue, order,  Optional.<Edge>absent());
 
                 return serialization.getEdgesToTarget( edgeColumnFamilies, 
scope, search, shards );
             }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
index d3bf24e..c91fe89 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
@@ -48,6 +48,7 @@ import 
org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
 import com.google.inject.Inject;
 
 import rx.Observable;
@@ -112,7 +113,8 @@ public class GraphManagerLoadTest {
 
             @Override
             public Observable<Edge> doSearch( final GraphManager manager ) {
-                 return manager.loadEdgesFromSource( new 
SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), 
SearchByEdgeType.Order.DESCENDING, null) );
+                 return manager.loadEdgesFromSource( new 
SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), 
SearchByEdgeType.Order.DESCENDING,  Optional
+                                      .<Edge>absent()) );
             }
         };
 
@@ -139,7 +141,7 @@ public class GraphManagerLoadTest {
 
             @Override
             public Observable<Edge> doSearch( final GraphManager manager ) {
-                return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( 
targetId, "test", System.currentTimeMillis(), 
SearchByEdgeType.Order.DESCENDING, null ) );
+                return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( 
targetId, "test", System.currentTimeMillis(), 
SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
             }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b144cc2b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 095f855..d702556 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -60,6 +60,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Slf4jReporter;
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -166,7 +167,7 @@ public class GraphManagerShardConsistencyIT {
             public Observable<Edge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesFromSource(
                         new SimpleSearchByEdgeType( sourceId, edgeType, 
Long.MAX_VALUE,
-                                SearchByEdgeType.Order.DESCENDING, null ) );
+                                SearchByEdgeType.Order.DESCENDING,  
Optional.<Edge>absent() ) );
             }
         };
 

Reply via email to