Refactor into commands WIP

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d5978cca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d5978cca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d5978cca

Branch: refs/heads/USERGRID-593
Commit: d5978cca07366c43314687081973f48a80b61d8c
Parents: 60617d2
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Apr 24 14:45:50 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Apr 24 14:45:50 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |   6 +-
 .../corepersistence/command/CommandBuilder.java |  10 +-
 .../command/read/CollectCommand.java            |  36 ---
 .../corepersistence/command/read/Collector.java |  36 +++
 .../command/read/ReadCommandFactory.java        |  81 +++++++
 .../AbstractQueryElasticSearchCollector.java    | 107 +++++++++
 .../QueryCollectionElasticSearchCollector.java  |  62 +++++
 .../QueryConnectionElasticSearchCollector.java  |  62 +++++
 .../impl/CollectionRefsVerifier.java            |  44 ++++
 .../CollectionResultsLoaderFactoryImpl.java     |  59 +++++
 .../impl/ConnectionRefsVerifier.java            |  59 +++++
 .../ConnectionResultsLoaderFactoryImpl.java     |  65 +++++
 .../impl/ElasticSearchQueryExecutor.java        | 238 ++++++++++++++++++
 .../read/elasticsearch/impl/EntityVerifier.java | 127 ++++++++++
 .../elasticsearch/impl/FilteringLoader.java     | 226 +++++++++++++++++
 .../read/elasticsearch/impl/IdsVerifier.java    |  46 ++++
 .../read/elasticsearch/impl/ResultsLoader.java  |  43 ++++
 .../impl/ResultsLoaderFactory.java              |  41 ++++
 .../elasticsearch/impl/ResultsVerifier.java     |  52 ++++
 .../elasticsearch/impl/VersionVerifier.java     |  85 +++++++
 .../read/entity/EntityLoadCollector.java        | 138 +++++++++++
 .../command/read/entity/EntityLoadCommand.java  | 134 -----------
 .../read/graph/ReadGraphCollectionCommand.java  |   3 +
 .../read/graph/ReadGraphConnectionCommand.java  |   3 +
 .../results/AbstractGraphQueryExecutor.java     |   4 +-
 .../results/CollectionRefsVerifier.java         |  44 ----
 .../CollectionResultsLoaderFactoryImpl.java     |  59 -----
 .../results/ConnectionRefsVerifier.java         |  61 -----
 .../ConnectionResultsLoaderFactoryImpl.java     |  65 -----
 .../results/ElasticSearchQueryExecutor.java     | 240 -------------------
 .../corepersistence/results/EntityVerifier.java | 127 ----------
 .../results/FilteringLoader.java                | 226 -----------------
 .../corepersistence/results/IdsVerifier.java    |  46 ----
 .../results/ObservableQueryExecutor.java        |  76 ++++++
 .../corepersistence/results/ResultsLoader.java  |  43 ----
 .../results/ResultsLoaderFactory.java           |  41 ----
 .../results/ResultsVerifier.java                |  52 ----
 .../results/VersionVerifier.java                |  85 -------
 38 files changed, 1663 insertions(+), 1269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 86ba300..de7fef1 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -32,10 +32,10 @@ import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.index.AsyncIndexService;
 import 
org.apache.usergrid.corepersistence.results.CollectionGraphQueryExecutor;
-import 
org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl;
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.CollectionResultsLoaderFactoryImpl;
 import 
org.apache.usergrid.corepersistence.results.ConnectionGraphQueryExecutor;
-import 
org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl;
-import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor;
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl;
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ElasticSearchQueryExecutor;
 import org.apache.usergrid.corepersistence.results.QueryExecutor;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
index de76f50..d9d6a12 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java
@@ -22,7 +22,7 @@ package org.apache.usergrid.corepersistence.command;
 
 import org.apache.usergrid.corepersistence.command.cursor.RequestCursor;
 import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor;
-import org.apache.usergrid.corepersistence.command.read.CollectCommand;
+import org.apache.usergrid.corepersistence.command.read.Collector;
 import org.apache.usergrid.corepersistence.command.read.Command;
 import org.apache.usergrid.corepersistence.command.read.TraverseCommand;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -87,12 +87,12 @@ public class CommandBuilder {
     /**
      * Build the final collection step, and
      */
-    public <T> Observable<T> build( final CollectCommand<T> collectCommand ) {
-        setState( collectCommand );
+    public <T> Observable<T> build( final Collector<T> collector ) {
+        setState( collector );
 
-        collectCommand.setLimit( limit );
+        collector.setLimit( limit );
 
-        return currentObservable.compose( collectCommand );
+        return currentObservable.compose( collector );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
deleted file mode 100644
index c2ad931..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.command.read;
-
-
-/**
- * A command that is used to reduce our stream of results into a final output
- * @param <T>
- */
-public interface CollectCommand<T> extends Command<T>{
-
-    /**
-     * Set the prefered result size for the command
-     * @param resultSize
-     */
-    void setLimit( final int resultSize );
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java
new file mode 100644
index 0000000..2804db4
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Collector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read;
+
+
+/**
+ * A command that is used to reduce our stream of results into a final output
+ * @param <T>
+ */
+public interface Collector<T> extends Command<T>{
+
+    /**
+     * Set the prefered result size for the command
+     * @param limit
+     */
+    void setLimit( final int limit );
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java
new file mode 100644
index 0000000..07f300a
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/ReadCommandFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read;
+
+
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.QueryCollectionElasticSearchCollector;
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.QueryConnectionElasticSearchCollector;
+import 
org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCollector;
+import 
org.apache.usergrid.corepersistence.command.read.graph.ReadGraphCollectionCommand;
+import 
org.apache.usergrid.corepersistence.command.read.graph.ReadGraphConnectionCommand;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * A factory for generating read commands
+ */
+public interface ReadCommandFactory {
+
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     * @param applicationScope
+     * @param sourceId
+     * @param collectionName
+     * @return
+     */
+    ReadGraphCollectionCommand readGraphCollectionCommand(final 
ApplicationScope applicationScope, final Id sourceId, final String 
collectionName);
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     * @param applicationScope
+     * @param sourceId
+     * @param connectionName
+     * @return
+     */
+    ReadGraphConnectionCommand readGraphConnectionCommand(final 
ApplicationScope applicationScope, final Id sourceId, final String 
connectionName);
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     * @param applicationScope
+     * @return
+     */
+    EntityLoadCollector entityLoadCollector(final ApplicationScope 
applicationScope);
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     * @param applicationScope
+     * @param sourceId
+     * @param collectionName
+     * @return
+     */
+    QueryCollectionElasticSearchCollector 
queryCollectionElasticSearchCollector(final ApplicationScope applicationScope, 
final Id sourceId, final String collectionName);
+
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     * @param applicationScope
+     * @param sourceId
+     * @param connectionName
+     * @return
+     */
+    QueryConnectionElasticSearchCollector 
queryConnectionElasticSearchCollector(final ApplicationScope applicationScope, 
final Id sourceId, final String connectionName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java
new file mode 100644
index 0000000..18cdeb0
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/AbstractQueryElasticSearchCollector.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.corepersistence.command.read.AbstractCommand;
+import org.apache.usergrid.corepersistence.command.read.Collector;
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ElasticSearchQueryExecutor;
+import org.apache.usergrid.corepersistence.results.QueryExecutor;
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * A command that will query and load elasticsearch
+ *
+ * On future iteration, this needs to be split into 2 commands 1 that loads 
the candidate results and validates the
+ * versions then another that will search and load
+ *
+ * TODO, split this into 3 seperate observables
+ *
+ * 1) An observable that emits candidate results
+ * 2) An observable that validates versions by uuid (for Traverse commands)
+ * 3) An observbale that emits Results as a collector (for final commands)
+ */
+public abstract class AbstractQueryElasticSearchCollector extends 
AbstractCommand<Results, Integer> implements
+    Collector<Results> {
+
+
+    private final ResultsLoaderFactory resultsLoaderFactory;
+    private final ApplicationEntityIndex entityIndex;
+                                           private final ApplicationScope 
applicationScope;
+    private final SearchEdge indexScope;
+    private final SearchTypes searchTypes;
+    private final Query query;
+    private int limit;
+
+
+
+    protected AbstractQueryElasticSearchCollector( final 
ApplicationEntityIndex entityIndex,
+                                                   final ApplicationScope 
applicationScope, final SearchEdge indexScope,
+                                                   final SearchTypes 
searchTypes, final Query query ) {
+        this.entityIndex = entityIndex;
+        this.applicationScope = applicationScope;
+        this.indexScope = indexScope;
+        this.searchTypes = searchTypes;
+        this.query = query;
+        this.resultsLoaderFactory = getResultsLoaderFactory();
+    }
+
+
+    @Override
+    public Observable<Results> call( final Observable<Id> idObservable ) {
+       final Iterable<Results> executor =
+            new ElasticSearchQueryExecutor( resultsLoaderFactory, entityIndex, 
applicationScope, indexScope, searchTypes,
+                query.withLimit( limit ));
+
+        return Observable.from(executor);
+    }
+
+
+    /**
+     * Get the results loader factor
+     * @return
+     */
+    protected abstract ResultsLoaderFactory getResultsLoaderFactory();
+
+    @Override
+    protected Class<Integer> getCursorClass() {
+        return Integer.class;
+    }
+
+
+    @Override
+    public void setLimit( final int limit ) {
+        this.limit = limit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java
new file mode 100644
index 0000000..2bbcc48
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryCollectionElasticSearchCollector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch;
+
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl;
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+
+import com.google.inject.Inject;
+
+
+/**
+ * Command for querying connections
+ */
+public class QueryCollectionElasticSearchCollector extends 
AbstractQueryElasticSearchCollector {
+
+    private final ManagerCache managerCache;
+    private final EntityRef headEntity;
+    private final String connectionName;
+
+    @Inject
+    protected QueryCollectionElasticSearchCollector( final 
ApplicationEntityIndex entityIndex,
+                                                     final ApplicationScope 
applicationScope,
+                                                     final SearchEdge 
indexScope, final SearchTypes searchTypes,
+                                                     final Query query, final 
ManagerCache managerCache,
+                                                     final EntityRef 
headEntity, final String connectionName ) {
+        super( entityIndex, applicationScope, indexScope, searchTypes, query );
+        this.managerCache = managerCache;
+        this.headEntity = headEntity;
+        this.connectionName = connectionName;
+    }
+
+
+    @Override
+    protected ResultsLoaderFactory getResultsLoaderFactory() {
+        return new ConnectionResultsLoaderFactoryImpl( managerCache, 
headEntity, connectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java
new file mode 100644
index 0000000..0e2f221
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/QueryConnectionElasticSearchCollector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch;
+
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl;
+import 
org.apache.usergrid.corepersistence.command.read.elasticsearch.impl.ResultsLoaderFactory;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+
+import com.google.inject.Inject;
+
+
+/**
+ * Command for querying connections
+ */
+public class QueryConnectionElasticSearchCollector extends 
AbstractQueryElasticSearchCollector {
+
+    private final ManagerCache managerCache;
+    private final EntityRef headEntity;
+    private final String connectionName;
+
+    @Inject
+    protected QueryConnectionElasticSearchCollector( final 
ApplicationEntityIndex entityIndex,
+                                                     final ApplicationScope 
applicationScope,
+                                                     final SearchEdge 
indexScope, final SearchTypes searchTypes,
+                                                     final Query query, final 
ManagerCache managerCache,
+                                                     final EntityRef 
headEntity, final String connectionName ) {
+        super( entityIndex, applicationScope, indexScope, searchTypes, query );
+        this.managerCache = managerCache;
+        this.headEntity = headEntity;
+        this.connectionName = connectionName;
+    }
+
+
+    @Override
+    protected ResultsLoaderFactory getResultsLoaderFactory() {
+        return new ConnectionResultsLoaderFactoryImpl( managerCache, 
headEntity, connectionName );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java
new file mode 100644
index 0000000..4b67ae8
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionRefsVerifier.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class CollectionRefsVerifier extends VersionVerifier {
+
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+        List<EntityRef> refs = new ArrayList<EntityRef>(ids.size());
+        for ( Id id : ids ) {
+            refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
+        }
+        return Results.fromRefList( refs );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
new file mode 100644
index 0000000..f9dd4e1
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Factory for creating results
+ */
+public class CollectionResultsLoaderFactoryImpl implements 
ResultsLoaderFactory {
+
+    private final ManagerCache managerCache;
+
+
+    public CollectionResultsLoaderFactoryImpl( final ManagerCache managerCache 
) {
+        this.managerCache = managerCache;
+    }
+
+
+    @Override
+    public ResultsLoader getLoader( final ApplicationScope applicationScope, 
final SearchEdge scope, final Query.Level resultsLevel ) {
+
+        ResultsVerifier verifier;
+
+        if ( resultsLevel == Query.Level.REFS ) {
+            verifier = new CollectionRefsVerifier();
+        }
+        else if ( resultsLevel == Query.Level.IDS ) {
+            verifier = new IdsVerifier();
+        }
+        else {
+            verifier = new EntityVerifier(Query.MAX_LIMIT);
+        }
+
+        return new FilteringLoader( managerCache, verifier, applicationScope, 
scope );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java
new file mode 100644
index 0000000..bffb53a
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionRefsVerifier.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
+
+
+/**
+ * Verifier for creating connections
+ */
+public class ConnectionRefsVerifier extends VersionVerifier {
+
+
+    private final EntityRef ownerId;
+    private final String connectionType;
+
+
+    public ConnectionRefsVerifier( final EntityRef ownerId, final String 
connectionType ) {
+        this.ownerId = ownerId;
+        this.connectionType = connectionType;
+    }
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+        List<ConnectionRef> refs = new ArrayList<>();
+        for ( Id id : ids ) {
+            refs.add( new ConnectionRefImpl( ownerId, connectionType, 
ref(id.getType(), id.getUuid())  ));
+        }
+
+        return Results.fromConnections( refs );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
new file mode 100644
index 0000000..707e933
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Factory for creating results
+ */
+public class ConnectionResultsLoaderFactoryImpl implements 
ResultsLoaderFactory {
+
+    private final ManagerCache managerCache;
+    private final EntityRef ownerId;
+    private final String connectionType;
+
+
+    public ConnectionResultsLoaderFactoryImpl( final ManagerCache 
managerCache, final EntityRef ownerId,
+                                               final String connectionType ) {
+        this.managerCache = managerCache;
+        this.ownerId = ownerId;
+        this.connectionType = connectionType;
+    }
+
+
+    @Override
+    public ResultsLoader getLoader( final ApplicationScope applicationScope, 
final SearchEdge scope, final Query.Level resultsLevel ) {
+
+        ResultsVerifier verifier;
+
+        if ( resultsLevel == Query.Level.REFS ) {
+            verifier = new ConnectionRefsVerifier( ownerId, connectionType );
+        }
+        else if ( resultsLevel == Query.Level.IDS ) {
+            verifier = new ConnectionRefsVerifier( ownerId, connectionType );;
+        }
+        else {
+            verifier = new EntityVerifier(Query.MAX_LIMIT);
+        }
+
+        return new FilteringLoader( managerCache, verifier, applicationScope, 
scope );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
new file mode 100644
index 0000000..c7e8f56
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.corepersistence.results.QueryExecutor;
+import org.apache.usergrid.persistence.index.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+public class ElasticSearchQueryExecutor implements Iterable<Results>, 
Iterator<Results> {
+
+    private static final Logger logger = LoggerFactory.getLogger( 
ElasticSearchQueryExecutor.class );
+
+    private final ResultsLoaderFactory resultsLoaderFactory;
+
+    private final ApplicationScope applicationScope;
+
+    private final ApplicationEntityIndex entityIndex;
+
+    private final SearchEdge indexScope;
+
+    private final SearchTypes types;
+
+    private final Query query;
+
+
+    private Results currentResults;
+
+    private boolean moreToLoad = true;
+
+
+    public ElasticSearchQueryExecutor( final ResultsLoaderFactory 
resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
+                                       final ApplicationScope 
applicationScope, final SearchEdge indexScope,
+                                       final SearchTypes types, final Query 
query ) {
+        this.resultsLoaderFactory = resultsLoaderFactory;
+        this.applicationScope = applicationScope;
+        this.entityIndex = entityIndex;
+        this.indexScope = indexScope;
+        this.types = types;
+
+        //we must deep copy the query passed.  Otherwise we will modify it's 
state with cursors.  Won't fix, not relevant
+        //once we start subscribing to streams.
+        this.query = new Query(query);
+    }
+
+
+    @Override
+    public Iterator<Results> iterator() {
+        return this;
+    }
+
+
+    private void loadNextPage() {
+        // Because of possible stale entities, which are filtered out by 
buildResults(),
+        // we loop until the we've got enough results to satisfy the query 
limit.
+
+        final int maxQueries = 10; // max re-queries to satisfy query limit
+
+        final int originalLimit = query.getLimit();
+
+        Results results = null;
+        int queryCount = 0;
+
+
+        CandidateResults crs = null;
+
+        while ( queryCount++ < maxQueries ) {
+
+            crs = getCandidateResults( query );
+
+
+            logger.debug( "Calling build results with crs {}", crs );
+            results = buildResults( indexScope, query, crs );
+
+            /**
+             * In an edge case where we delete stale entities, we could 
potentially get less results than expected.
+             * This will only occur once during the repair phase.
+             * We need to ensure that we short circuit before we overflow the 
requested limit during a repair.
+             */
+            if ( crs.isEmpty() || !crs.hasOffset() || results.size() > 0 ) { 
// no results, no cursor, can't get more
+                break;
+            }
+
+
+            //we didn't load anything, but there was a cursor, this means a 
read repair occured.  We have to short
+            //circuit to avoid over returning the result set
+
+
+            // need to query for more
+            // ask for just what we need to satisfy, don't want to exceed limit
+            query.setOffsetFromCursor(results.getCursor());
+            query.setLimit( originalLimit - results.size() );
+
+            logger.warn( "Satisfy query limit {}, new limit {} query count 
{}", new Object[] {
+                originalLimit, query.getLimit(), queryCount
+            } );
+        }
+
+        //now set our cursor if we have one for the next iteration
+        if ( results.hasCursor() ) {
+            query.setOffsetFromCursor(results.getCursor());
+            moreToLoad = true;
+        }
+
+        else {
+            moreToLoad = false;
+        }
+
+
+        //set our select subjects into our query if provided
+        if(crs != null){
+            query.setSelectSubjects( crs.getGetFieldMappings() );
+        }
+
+
+        //set our current results and the flag
+        this.currentResults = results;
+    }
+
+
+    /**
+     * Get the candidates or load the cursor, whichever we require
+     * @param query
+     * @return
+     */
+    private CandidateResults getCandidateResults(final Query query){
+        final Optional<Integer> cursor = query.getOffset();
+        final String queryToExecute = query.getQl().or("select *");
+
+        CandidateResults results = cursor.isPresent()
+            ? entityIndex.search( indexScope, types, queryToExecute, 
query.getLimit() , cursor.get())
+            : entityIndex.search( indexScope, types, queryToExecute, 
query.getLimit());
+
+        return results;
+    }
+
+
+    /**
+     * Build results from a set of candidates, and discard those that 
represent stale indexes.
+     *
+     * @param query Query that was executed
+     * @param crs Candidates to be considered for results
+     */
+    private Results buildResults( final SearchEdge indexScope, final Query 
query, final CandidateResults crs ) {
+
+        logger.debug( "buildResults()  from {} candidates", crs.size() );
+
+        //get an instance of our results loader
+        final ResultsLoader resultsLoader =
+            this.resultsLoaderFactory.getLoader( applicationScope, indexScope, 
query.getResultsLevel() );
+
+        //load the results
+        final Results results = resultsLoader.loadResults(crs);
+
+        //signal for post processing
+        resultsLoader.postProcess();
+
+        //set offset into query
+        if(crs.getOffset().isPresent()) {
+            query.setOffset(crs.getOffset().get());
+        }else{
+            query.clearOffset();
+        }
+        results.setCursorFromOffset( query.getOffset() );
+
+        logger.debug( "Returning results size {}", results.size() );
+
+        return results;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+        //we've tried to load and it's empty and we have more to load, load 
the next page
+        if ( currentResults == null ) {
+            //there's nothing left to load, nothing to do
+            if ( !moreToLoad ) {
+                return false;
+            }
+
+            //load the page
+
+            loadNextPage();
+        }
+
+
+        //see if our current results are not null
+        return currentResults != null;
+    }
+
+
+    @Override
+    public Results next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more results present" );
+        }
+
+        final Results toReturn = currentResults;
+
+        currentResults = null;
+
+        return toReturn;
+    }
+
+    @Override
+    public void remove() {
+        throw new RuntimeException("Remove not implemented!!");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java
new file mode 100644
index 0000000..7be8aa4
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/EntityVerifier.java
@@ -0,0 +1,127 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+
+
+/**
+ * A loader that verifies versions are correct in cassandra and match 
elasticsearch
+ */
+public class EntityVerifier implements ResultsVerifier {
+
+    private static final Logger logger = LoggerFactory.getLogger( 
EntityVerifier.class );
+
+    private EntitySet ids;
+
+    private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> 
entityMapping;
+
+
+    public EntityVerifier( final int maxSize ) {
+        this.entityMapping = new HashMap<>( maxSize );
+    }
+
+
+    @Override
+    public void loadResults( final Collection<Id> idsToLoad, final 
EntityCollectionManager ecm ) {
+        ids = ecm.load( idsToLoad ).toBlocking().last();
+        logger.debug("loadResults() asked for {} ids and got {}", 
idsToLoad.size(), ids.size());
+    }
+
+
+    @Override
+    public boolean isValid( final CandidateResult candidateResult ) {
+        final Id entityId = candidateResult.getId();
+
+        final MvccEntity savedEntity = ids.getEntity( entityId );
+
+        //version wasn't found deindex
+        if ( savedEntity == null ) {
+            logger.warn( "Version for Entity {}:{} not found", 
entityId.getType(), entityId.getUuid() );
+            return false;
+        }
+
+        final UUID candidateVersion = candidateResult.getVersion();
+        final UUID savedVersion = savedEntity.getVersion();
+
+        if ( UUIDComparator.staticCompare( savedVersion, candidateVersion ) > 
0 ) {
+            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, 
latest v:{}", new Object[] {
+                    entityId.getUuid(), entityId.getType(), candidateVersion, 
savedEntity
+            } );
+
+            return false;
+        }
+
+
+        final Optional<org.apache.usergrid.persistence.model.entity.Entity> 
entity = savedEntity.getEntity();
+
+        if ( !entity.isPresent() ) {
+            logger.warn( "Entity uuid:{} version v:{} is deleted but indexed, 
this is a bug ",
+                    entityId.getUuid(), savedEntity.getEntity() );
+            return false;
+        }
+
+        entityMapping.put( entityId, entity.get() );
+
+        return true;
+    }
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+
+        final List<Entity> ugEntities = new ArrayList<>( ids.size() );
+
+        for ( final Id id : ids ) {
+            final org.apache.usergrid.persistence.model.entity.Entity cpEntity 
= entityMapping.get( id );
+
+            Entity entity = EntityFactory.newEntity( id.getUuid(), 
id.getType() );
+
+            Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
+            entity.addProperties( entityMap );
+            ugEntities.add( entity );
+        }
+
+        return Results.fromEntities( ugEntities );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java
new file mode 100644
index 0000000..c0afe92
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/FilteringLoader.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
+
+public class FilteringLoader implements ResultsLoader {
+
+    private static final Logger logger = LoggerFactory.getLogger( 
FilteringLoader.class );
+
+    private final ManagerCache managerCache;
+    private final ResultsVerifier resultsVerifier;
+    private final ApplicationScope applicationScope;
+    private final SearchEdge indexScope;
+    private final EntityIndexBatch indexBatch;
+
+
+    /**
+     * Create an instance of a filter loader
+     *
+     * @param managerCache The manager cache to load
+     * @param resultsVerifier The verifier to verify the candidate results
+     * @param applicationScope The application scope to perform the load
+     * @param indexScope The index scope used in the search
+     */
+    protected FilteringLoader( final ManagerCache managerCache, final 
ResultsVerifier resultsVerifier,
+                               final ApplicationScope applicationScope, final 
SearchEdge indexScope ) {
+
+        this.managerCache = managerCache;
+        this.resultsVerifier = resultsVerifier;
+        this.applicationScope = applicationScope;
+        this.indexScope = indexScope;
+
+        final ApplicationEntityIndex index = managerCache.getEntityIndex( 
applicationScope );
+
+        indexBatch = index.createBatch();
+    }
+
+
+    @Override
+    public Results loadResults( final CandidateResults crs ) {
+
+
+        if ( crs.size() == 0 ) {
+            return new Results();
+        }
+
+
+        // For each entity, holds the index it appears in our candidates for 
keeping ordering correct
+        final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() );
+
+        // Maps the entity ids to our candidates
+        final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( 
crs.size() );
+
+
+        final Iterator<CandidateResult> iter = crs.iterator();
+
+
+        // TODO, in this case we're "optimizing" due to the limitations of 
collection scope.
+        // Perhaps  we should change the API to just be an application, then 
an "owner" scope?
+
+        // Go through the candidates and group them by scope for more 
efficient retrieval.
+        // Also remove duplicates before we even make a network call
+        for ( int i = 0; iter.hasNext(); i++ ) {
+
+            final CandidateResult currentCandidate = iter.next();
+
+            final Id entityId = currentCandidate.getId();
+
+            //check if we've seen this candidate by id
+            final CandidateResult previousMax = maxCandidateMapping.get( 
entityId );
+
+            //its not been seen, save it
+            if ( previousMax == null ) {
+                maxCandidateMapping.put( entityId, currentCandidate );
+                orderIndex.put( entityId, i );
+                continue;
+            }
+
+            //we have seen it, compare them
+
+            final UUID previousMaxVersion = previousMax.getVersion();
+
+            final UUID currentVersion = currentCandidate.getVersion();
+
+
+            final CandidateResult toRemove;
+            final CandidateResult toKeep;
+
+            //current is newer than previous.  Remove previous and keep current
+            if ( UUIDComparator.staticCompare( currentVersion, 
previousMaxVersion ) > 0 ) {
+                toRemove = previousMax;
+                toKeep = currentCandidate;
+            }
+            //previously seen value is newer than current.  Remove the current 
and keep the previously seen value
+            else {
+                toRemove = currentCandidate;
+                toKeep = previousMax;
+            }
+
+            //this is a newer version, we know we already have a stale entity, 
add it to be cleaned up
+
+
+            //de-index it
+            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, 
latest v:{}", new Object[] {
+                    entityId.getUuid(), entityId.getType(), 
toRemove.getVersion(), toKeep.getVersion()
+                } );
+
+            //deindex this document, and remove the previous maxVersion
+            //we have to deindex this from our ownerId, since this is what 
gave us the reference
+            indexBatch.deindex( indexScope, toRemove );
+
+
+            //TODO, fire the entity repair cleanup task here instead of 
de-indexing
+
+            //replace the value with a more current version
+            maxCandidateMapping.put( entityId, toKeep );
+            orderIndex.put( entityId, i );
+        }
+
+
+        //now everything is ordered, and older versions are removed.  Batch 
fetch versions to verify
+        // existence and correct versions
+
+        final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
+
+
+        final Collection<Id> idsToLoad =
+            Collections2.transform( maxCandidateMapping.values(), new 
Function<CandidateResult, Id>() {
+                @Nullable
+                @Override
+                public Id apply( @Nullable final CandidateResult input ) {
+                    //NOTE this is never null, we won't need to check
+                    return input.getId();
+                }
+            } );
+
+
+        //now using the scope, load the collection
+
+        // Get the collection scope and batch load all the versions.  We put 
all entities in
+        // app/app for easy retrieval/ unless persistence changes, we never 
want to read from
+        // any scope other than the app, app, scope name scope
+        //            final CollectionScope collScope = new 
CollectionScopeImpl(
+        //                applicationScope.getApplication(), 
applicationScope.getApplication(), scopeName);
+
+        final EntityCollectionManager ecm = 
managerCache.getEntityCollectionManager( applicationScope );
+
+
+        //load the results into the loader for this scope for validation
+        resultsVerifier.loadResults( idsToLoad, ecm );
+
+        //now let the loader validate each candidate.  For instance, the "max" 
in this candidate
+        //could still be a stale result, so it needs validated
+        for ( final Id requestedId : idsToLoad ) {
+
+            final CandidateResult cr = maxCandidateMapping.get( requestedId );
+
+            //ask the loader if this is valid, if not discard it and de-index 
it
+            if ( !resultsVerifier.isValid( cr ) ) {
+                indexBatch.deindex( indexScope, cr );
+                continue;
+            }
+
+            //if we get here we're good, we need to add this to our results
+            final int candidateIndex = orderIndex.get( requestedId );
+
+            sortedResults.put( candidateIndex, requestedId );
+        }
+
+
+        // NOTE DO NOT execute the batch here.
+        // It changes the results and we need consistent paging until we 
aggregate all results
+        return resultsVerifier.getResults( sortedResults.values() );
+    }
+
+
+    @Override
+    public void postProcess() {
+        this.indexBatch.execute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java
new file mode 100644
index 0000000..4d2bd55
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/IdsVerifier.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class IdsVerifier extends VersionVerifier {
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+
+        final List<UUID> returnIds = new ArrayList<>( ids.size() );
+
+        for ( final Id id : ids ) {
+            returnIds.add( id.getUuid() );
+        }
+
+
+        return Results.fromIdList( returnIds );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java
new file mode 100644
index 0000000..fa0e71f
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoader.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.index.CandidateResults;
+
+
+/**
+ * Interface for loading results
+ */
+public interface ResultsLoader {
+
+    /**
+     * Using the candidate results, load our results.  Should filter stale 
results
+     * @param  crs The candidate result set
+     * @return Results.  Null safe, but may be empty
+     */
+    public Results loadResults( final CandidateResults crs);
+
+    /**
+     * Post process the load operation
+     */
+    public void postProcess();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java
new file mode 100644
index 0000000..14db80e
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsLoaderFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Factory for creating results
+ */
+public interface ResultsLoaderFactory {
+
+    /**
+     * Get the loader for results
+     * @param applicationScope The application scope used to load results
+     * @param indexScope The index scope used in the search
+     * @param
+     */
+    ResultsLoader getLoader( final ApplicationScope applicationScope, final 
SearchEdge indexScope,
+                             final Query.Level resultsLevel );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java
new file mode 100644
index 0000000..68515fa
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/ResultsVerifier.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import java.util.Collection;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public interface ResultsVerifier {
+
+    /**
+     * Load all the candidate ides for verification
+     * @param ids The Id's to load
+     * @param ecm The entity collection manager
+     */
+    public void loadResults(Collection<Id> ids, EntityCollectionManager ecm);
+
+    /**
+     * Return true if the candidate result is a valid result that should be 
retained. If it should
+     * not it should also be removed from the list of possible return values 
in this loader
+     * @param candidateResult
+     */
+    public boolean isValid(CandidateResult candidateResult);
+
+
+    /**
+     * Load the result set with the given ids
+     * @return
+     */
+    public Results getResults(Collection<Id> ids);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java
new file mode 100644
index 0000000..58e4296
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/elasticsearch/impl/VersionVerifier.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.elasticsearch.impl;
+
+
+import java.util.Collection;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+
+
+/**
+ * A loader that verifies versions are correct in Cassandra and match 
ElasticSearch
+ */
+public abstract class VersionVerifier implements ResultsVerifier {
+
+    private static final Logger logger = LoggerFactory.getLogger( 
VersionVerifier.class );
+
+    private VersionSet ids;
+
+
+    @Override
+    public void loadResults( final Collection<Id> idsToLoad, final 
EntityCollectionManager ecm ) {
+        ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last();
+    }
+
+
+    @Override
+    public boolean isValid( final CandidateResult candidateResult ) {
+        final Id entityId = candidateResult.getId();
+
+        final MvccLogEntry version = ids.getMaxVersion( entityId );
+
+        //version wasn't found ,deindex
+        if ( version == null ) {
+            logger.warn( "Version for Entity {}:{} not found",
+                    entityId.getUuid(), entityId.getUuid() );
+
+            return false;
+        }
+
+        final UUID savedVersion = version.getVersion();
+
+        if ( UUIDComparator.staticCompare( savedVersion, 
candidateResult.getVersion() ) > 0 ) {
+            logger.debug( "Stale version of Entity uuid:{} type:{}, stale 
v:{}, latest v:{}",
+                new Object[] {
+                    entityId.getUuid(),
+                    entityId.getType(),
+                    candidateResult.getVersion(),
+                    savedVersion
+            } );
+
+            return false;
+        }
+
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java
new file mode 100644
index 0000000..1504f26
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCollector.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.command.read.entity;
+
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.usergrid.corepersistence.command.read.AbstractCommand;
+import org.apache.usergrid.corepersistence.command.read.Collector;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Loads entities from a set of Ids.
+ *
+ * TODO refactor this into a common command that both ES search and 
graphSearch can use for repair and verification
+ */
+public class EntityLoadCollector extends AbstractCommand<Results, 
Serializable> implements Collector<Results> {
+
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+
+    //TODO get rid of this when merged into 2.0 dev
+    private final ApplicationScope applicationScope;
+    private int resultSize;
+
+
+    public EntityLoadCollector( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                                final ApplicationScope applicationScope ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.applicationScope = applicationScope;
+    }
+
+
+    @Override
+    protected Class<Serializable> getCursorClass() {
+        return null;
+    }
+
+
+    @Override
+    public Observable<Results> call( final Observable<Id> observable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean 
up our lower levels and create new results
+         * objects
+         */
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+        final Observable<EntitySet> entitySetObservable = observable.buffer( 
resultSize ).flatMap(
+            bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> 
entityCollectionManager.load( ids ) ) );
+
+
+        final Observable<Results> resultsObservable =  entitySetObservable
+
+            .flatMap( entitySet -> {
+
+            //get our entites and filter missing ones, then collect them into 
a results object
+            final Observable<MvccEntity> mvccEntityObservable = 
Observable.from( entitySet.getEntities() );
+
+            //convert them to our old entity model, then filter nulls, meaning 
they weren't found
+            return mvccEntityObservable.map( mvccEntity -> mapEntity( 
mvccEntity ) ).filter( entity -> entity == null )
+
+                //convert them to a list, then map them into results
+                .toList().map( entities -> {
+                    final Results results = Results.fromEntities( entities );
+                    results.setCursor( generateCursor() );
+
+                    return results;
+                } )
+                //if no results are present, return an empty results
+                .singleOrDefault( new Results(  ) );
+        } );
+
+
+        return resultsObservable;
+    }
+
+                /**
+                 * Map a new cp entity to an old entity.  May be null if not 
present
+                 */
+
+
+    private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity 
mvccEntity ) {
+        if ( !mvccEntity.getEntity().isPresent() ) {
+            return null;
+        }
+
+
+        final Entity cpEntity = mvccEntity.getEntity().get();
+        final Id entityId = cpEntity.getId();
+
+        org.apache.usergrid.persistence.Entity entity =
+            EntityFactory.newEntity( entityId.getUuid(), entityId.getType() );
+
+        Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
+        entity.addProperties( entityMap );
+
+        return entity;
+    }
+
+
+    @Override
+    public void setLimit( final int limit ) {
+        this.resultSize = limit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
deleted file mode 100644
index 08193ef..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.command.read.entity;
-
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.usergrid.corepersistence.command.read.AbstractCommand;
-import org.apache.usergrid.corepersistence.command.read.CollectCommand;
-import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
-import org.apache.usergrid.persistence.EntityFactory;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from a set of Ids.
- *
- * TODO refactor this into a common command that both ES search and 
graphSearch can use for repair and verification
- */
-public class EntityLoadCommand extends AbstractCommand<Results, Serializable> 
implements CollectCommand<Results> {
-
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-
-    //TODO get rid of this when merged into 2.0 dev
-    private final ApplicationScope applicationScope;
-    private int resultSize;
-
-
-    public EntityLoadCommand( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
-                              final ApplicationScope applicationScope ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.applicationScope = applicationScope;
-    }
-
-
-    @Override
-    protected Class<Serializable> getCursorClass() {
-        return null;
-    }
-
-
-    @Override
-    public Observable<Results> call( final Observable<Id> observable ) {
-
-
-        /**
-         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean 
up our lower levels and create new results
-         * objects
-         */
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
-
-        final Observable<EntitySet> entitySetObservable = observable.buffer( 
resultSize ).flatMap(
-            bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> 
entityCollectionManager.load( ids ) ) );
-
-
-        return entitySetObservable
-
-            .flatMap( entitySet -> {
-
-                //get our entites and filter missing ones, then collect them 
into a results object
-                final Observable<MvccEntity> mvccEntityObservable = 
Observable.from( entitySet.getEntities() );
-
-                //convert them to our old entity model, then filter nulls, 
meaning they weren't found
-                return mvccEntityObservable.map( mvccEntity -> mapEntity( 
mvccEntity ) ).filter(
-                    entity -> entity == null )
-
-                    //convert them to a list, then map them into results
-                    .toList().map( entities -> {
-                        final Results results = Results.fromEntities( entities 
);
-                        results.setCursor( generateCursor() );
-
-                        return results;
-                    } );
-            } );
-    }
-
-                /**
-                 * Map a new cp entity to an old entity.  May be null if not 
present
-                 */
-
-
-    private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity 
mvccEntity ) {
-        if ( !mvccEntity.getEntity().isPresent() ) {
-            return null;
-        }
-
-
-        final Entity cpEntity = mvccEntity.getEntity().get();
-        final Id entityId = cpEntity.getId();
-
-        org.apache.usergrid.persistence.Entity entity =
-            EntityFactory.newEntity( entityId.getUuid(), entityId.getType() );
-
-        Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
-        entity.addProperties( entityMap );
-
-        return entity;
-    }
-
-
-    @Override
-    public void setLimit( final int resultSize ) {
-        this.resultSize = resultSize;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
index 9ccb969..3b6633b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java
@@ -22,6 +22,8 @@ package 
org.apache.usergrid.corepersistence.command.read.graph;
 
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
+import com.google.inject.Inject;
+
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName;
 
 
@@ -36,6 +38,7 @@ public class ReadGraphCollectionCommand extends 
AbstractReadGraphCommand {
     /**
      * Create a new instance of our command
      */
+    @Inject
     public ReadGraphCollectionCommand( final GraphManagerFactory 
graphManagerFactory, final String collectionName ) {
         super( graphManagerFactory );
         this.collectionName = collectionName;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
index adebd45..26b30e7 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java
@@ -22,6 +22,8 @@ package 
org.apache.usergrid.corepersistence.command.read.graph;
 
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
+import com.google.inject.Inject;
+
 import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName;
 
 
@@ -36,6 +38,7 @@ public class ReadGraphConnectionCommand extends 
AbstractReadGraphCommand {
     /**
      * Create a new instance of our command
      */
+    @Inject
     public ReadGraphConnectionCommand( final GraphManagerFactory 
graphManagerFactory, final String connectionName ) {
         super( graphManagerFactory );
         this.connectionName = connectionName;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d5978cca/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
index cf0764e..ec4271a 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import org.apache.usergrid.corepersistence.command.CommandBuilder;
-import 
org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCommand;
+import 
org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCollector;
 import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Results;
 import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -89,7 +89,7 @@ public abstract class AbstractGraphQueryExecutor implements 
QueryExecutor {
 
             //construct our results to be observed later. This is a cold 
observable
             final Observable<Results> resultsObservable =
-                commandBuilder.build( new EntityLoadCommand( 
entityCollectionManagerFactory, applicationScope ) );
+                commandBuilder.build( new EntityLoadCollector( 
entityCollectionManagerFactory, applicationScope ) );
 
             this.observableIterator = 
resultsObservable.toBlocking().getIterator();
 

Reply via email to