Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-593 9dc65dc8c -> 23acdd509


Query by edges collection working.

ES Query by collection not working

 Cursor generation not implemented


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/23acdd50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/23acdd50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/23acdd50

Branch: refs/heads/USERGRID-593
Commit: 23acdd509ffd5b423d7cdf5c58a16ca062ab9e5a
Parents: 9dc65dc
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Apr 27 10:38:06 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Apr 27 10:38:06 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   4 +
 .../corepersistence/CpEntityManager.java        |  15 +-
 .../corepersistence/CpEntityManagerFactory.java |  11 +-
 .../corepersistence/CpRelationManager.java      |  11 +-
 .../pipeline/PipelineModule.java                |  17 ++-
 .../corepersistence/pipeline/read/Filter.java   |   6 +-
 .../pipeline/read/FilterFactory.java            | 102 -------------
 .../pipeline/read/ReadFilterFactory.java        | 102 +++++++++++++
 .../pipeline/read/ReadFilterFactoryImpl.java    | 152 +++++++++++++++++++
 .../pipeline/read/ReadPipelineBuilder.java      |   3 +-
 .../pipeline/read/ReadPipelineBuilderImpl.java  |  31 ++--
 ...stractQueryElasticSearchCollectorFilter.java |  14 +-
 ...yCollectionElasticSearchCollectorFilter.java |  20 ++-
 ...yConnectionElasticSearchCollectorFilter.java |  27 ++--
 .../read/entity/EntityLoadCollectorFilter.java  |  11 +-
 .../read/graph/AbstractReadGraphFilter.java     |  36 ++---
 .../graph/ReadGraphCollectionByIdFilter.java    |   2 +
 .../read/graph/ReadGraphCollectionFilter.java   |   4 +-
 .../graph/ReadGraphConnectionByIdFilter.java    |   2 +
 .../graph/ReadGraphConnectionByTypeFilter.java  |   8 +-
 .../read/graph/ReadGraphConnectionFilter.java   |   4 +-
 .../results/ObservableQueryExecutor.java        |   3 +-
 .../corepersistence/util/CpNamingUtils.java     |  20 +--
 .../graph/test/util/EdgeTestUtils.java          |   2 +-
 .../setup/ConcurrentProcessSingleton.java       |   6 +-
 25 files changed, 390 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 06fe058..ca20e23 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -31,6 +31,7 @@ import 
org.apache.usergrid.corepersistence.migration.CoreMigration;
 import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
 import 
org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
 import 
org.apache.usergrid.corepersistence.migration.MigrationModuleVersionPlugin;
+import org.apache.usergrid.corepersistence.pipeline.PipelineModule;
 import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
 import 
org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
@@ -158,6 +159,9 @@ public class CoreModule  extends AbstractModule {
 
         install( new GuicyFigModule( ApplicationIdCacheFig.class ) );
 
+        //install our pipeline modules
+        install(new PipelineModule());
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 748a069..2d419b6 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.index.AsyncIndexService;
+import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AggregateCounter;
@@ -179,9 +180,7 @@ public class CpEntityManager implements EntityManager {
 
     private final AsyncIndexService indexService;
 
-
-    private EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private GraphManagerFactory graphManagerFactory;
+    private PipelineBuilderFactory pipelineBuilderFactory;
 
     private boolean skipAggregateCounters;
     private MetricsFactory metricsFactory;
@@ -224,16 +223,16 @@ public class CpEntityManager implements EntityManager {
      * @param graphManagerFactory
      */
     public CpEntityManager( final CassandraService cass, final CounterUtils 
counterUtils, final AsyncIndexService indexService, final ManagerCache 
managerCache,
-                            final MetricsFactory metricsFactory,final 
EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                                        final 
GraphManagerFactory graphManagerFactory , final UUID applicationId ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.graphManagerFactory = graphManagerFactory;
+                            final MetricsFactory metricsFactory,final 
PipelineBuilderFactory pipelineBuilderFactory , final UUID applicationId ) {
+
 
         Preconditions.checkNotNull( cass, "cass must not be null" );
         Preconditions.checkNotNull( counterUtils, "counterUtils must not be 
null" );
         Preconditions.checkNotNull( managerCache, "managerCache must not be 
null" );
         Preconditions.checkNotNull( applicationId, "applicationId must not be 
null" );
         Preconditions.checkNotNull( indexService, "indexService must not be 
null" );
+        Preconditions.checkNotNull( pipelineBuilderFactory, 
"pipelineBuilderFactory must not be null" );
+        this.pipelineBuilderFactory = pipelineBuilderFactory;
 
 
         this.managerCache = managerCache;
@@ -747,7 +746,7 @@ public class CpEntityManager implements EntityManager {
         Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
 
         CpRelationManager relationManager =
-            new CpRelationManager( metricsFactory, managerCache, 
entityCollectionManagerFactory, graphManagerFactory, indexService, this, 
applicationId, entityRef );
+            new CpRelationManager( metricsFactory, managerCache, 
pipelineBuilderFactory, indexService, this, applicationId, entityRef );
         return relationManager;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index f06cf40..0d10fd2 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang.StringUtils;
 
 import org.apache.usergrid.corepersistence.index.AsyncIndexService;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.exception.ConflictException;
 import org.apache.usergrid.persistence.AbstractEntity;
@@ -74,6 +75,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.utils.UUIDUtils;
 
+import com.amazonaws.services.elastictranscoder.model.Pipeline;
 import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -118,8 +120,6 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
     private final ApplicationIdCache applicationIdCache;
 
     private ManagerCache managerCache;
-    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-    private final GraphManagerFactory graphManagerFactory;
 
     private CassandraService cassandraService;
     private CounterUtils counterUtils;
@@ -127,6 +127,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
     private final EntityIndex entityIndex;
     private final MetricsFactory metricsFactory;
     private final AsyncIndexService indexService;
+    private final PipelineBuilderFactory pipelineBuilderFactory;
 
     public CpEntityManagerFactory( final CassandraService cassandraService, 
final CounterUtils counterUtils,
                                    final Injector injector ) {
@@ -139,10 +140,9 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
         this.managerCache = injector.getInstance( ManagerCache.class );
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
         this.indexService = injector.getInstance( AsyncIndexService.class );
+        this.pipelineBuilderFactory = injector.getInstance( 
PipelineBuilderFactory.class );
         this.applicationIdCache = 
injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
             getManagementEntityManager() );
-        this.entityCollectionManagerFactory = injector.getInstance( 
EntityCollectionManagerFactory.class );
-        this.graphManagerFactory = injector.getInstance( 
GraphManagerFactory.class );
 
 
     }
@@ -199,8 +199,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
 
 
     private EntityManager _getEntityManager( UUID applicationId ) {
-        EntityManager em = new CpEntityManager(cassandraService, counterUtils, 
indexService, managerCache, metricsFactory,
-            entityCollectionManagerFactory, graphManagerFactory,  
applicationId );
+        EntityManager em = new CpEntityManager(cassandraService, counterUtils, 
indexService, managerCache, metricsFactory, pipelineBuilderFactory,  
applicationId );
         return em;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index af926bf..e7ef0ff 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -109,9 +109,7 @@ public class CpRelationManager implements RelationManager {
     private static final Logger logger = LoggerFactory.getLogger( 
CpRelationManager.class );
 
     private ManagerCache managerCache;
-    private EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private GraphManagerFactory graphManagerFactory;
-    private PipelineBuilderFactory pipelineBuilderFactory;
+    private final PipelineBuilderFactory pipelineBuilderFactory;
 
     private EntityManager em;
 
@@ -130,8 +128,7 @@ public class CpRelationManager implements RelationManager {
 
 
     public CpRelationManager( final MetricsFactory metricsFactory, final 
ManagerCache managerCache,
-                              final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
-                              final GraphManagerFactory graphManagerFactory, 
final AsyncIndexService indexService,
+                              final PipelineBuilderFactory 
pipelineBuilderFactory, final AsyncIndexService indexService,
                               final EntityManager em, final UUID 
applicationId, final EntityRef headEntity ) {
 
 
@@ -148,9 +145,7 @@ public class CpRelationManager implements RelationManager {
         this.headEntity = headEntity;
         this.managerCache = managerCache;
         this.applicationScope = CpNamingUtils.getApplicationScope( 
applicationId );
-
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.graphManagerFactory = graphManagerFactory;
+        this.pipelineBuilderFactory = pipelineBuilderFactory;
 
         this.metricsFactory = metricsFactory;
         this.updateCollectionTimer =

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
index 8afe2f8..55b84af 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java
@@ -20,7 +20,10 @@
 package org.apache.usergrid.corepersistence.pipeline;
 
 
+import org.apache.usergrid.corepersistence.pipeline.read.ReadFilterFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.ReadFilterFactoryImpl;
 import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder;
+import 
org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilderImpl;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
@@ -35,6 +38,18 @@ public class PipelineModule extends AbstractModule {
     protected void configure() {
         //Use Guice to create the builder since we don't really need to do 
anything
         //other than DI when creating the filters
-       install(new FactoryModuleBuilder().build( ReadPipelineBuilder.class ));
+       bind( ReadFilterFactory.class ).to( ReadFilterFactoryImpl.class );
+
+
+          //Use Guice to create the builder since we don't really need to do 
anything
+        //other than DI when creating the filters
+       install( new FactoryModuleBuilder().implement( 
ReadPipelineBuilder.class, ReadPipelineBuilderImpl.class )
+                                          .build( PipelineBuilderFactory.class 
) );
+
+
+
+            //Use Guice to create the builder since we don't really need to do 
anything
+        //other than DI when creating the filters
+//       install( new FactoryModuleBuilder().build( ReadFilterFactory.class ) 
);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
index 670d2ed..f50a2f4 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java
@@ -37,12 +37,12 @@ public interface Filter<T> extends 
Observable.Transformer<Id, T> {
 
 
     /**
-     * Set the id of this command in it's execution environment
+     * Set the id of this filter in it's execution environment
      */
     void setId( final int id );
 
     /**
-     * Set the cursor cache into the command
+     * Set the cursor cache into the filter
      *
      * @param readCache Set the cache that was used in the request
      * @param writeCache Set the cache to be used when writing the results
@@ -50,7 +50,7 @@ public interface Filter<T> extends Observable.Transformer<Id, 
T> {
     void setCursorCaches( final RequestCursor readCache, final ResponseCursor 
writeCache );
 
     /**
-     * Set the application scope of the command
+     * Set the application scope of the filter
      * @param applicationScope
      */
     void setApplicationScope(final ApplicationScope applicationScope);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
deleted file mode 100644
index 8f9776e..0000000
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read;
-
-
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryCollectionElasticSearchCollectorFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryConnectionElasticSearchCollectorFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter;
-import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-
-
-/**
- * A factory for generating read commands
- */
-public interface FilterFactory {
-
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    ReadGraphCollectionFilter readGraphCollectionCommand( final String 
collectionName );
-
-    /**
-     * Read a connection between two entities, the incoming and the target 
entity
-     * @param collectionName
-     * @param targetId
-     * @return
-     */
-    ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter(final String 
collectionName, final Id targetId);
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    ReadGraphConnectionFilter readGraphConnectionCommand( final String 
connectionName );
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String 
connectionName, final String entityType );
-
-
-    /**
-     * Read a connection directly between two identifiers
-     * @param connectionName
-     * @param targetId
-     * @return
-     */
-    ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter(final String 
connectionName, final Id targetId);
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    EntityLoadCollectorFilter entityLoadCollector();
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    QueryCollectionElasticSearchCollectorFilter 
queryCollectionElasticSearchCollector( final String collectionName, final 
String query);
-
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector( final String connectionName,final String 
query);
-
-
-    /**
-     * Generate a new instance of the command with the specified parameters
-     */
-    QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector( final String connectionName, final 
String connectionEntityType, final String query );
-
-
-    /**
-     * Get an entity id filter.  Used as a 1.0->2.0 bridge since we're not 
doing full traversals
-     */
-    EntityIdFilter getEntityIdFilter( final Id entityId );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java
new file mode 100644
index 0000000..92bdacb
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryCollectionElasticSearchCollectorFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryConnectionElasticSearchCollectorFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A factory for generating read commands
+ */
+public interface ReadFilterFactory {
+
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     */
+    ReadGraphCollectionFilter readGraphCollectionCommand( final String 
collectionName );
+
+    /**
+     * Read a connection between two entities, the incoming and the target 
entity
+     * @param collectionName
+     * @param targetId
+     * @return
+     */
+    ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter(final String 
collectionName, final Id targetId);
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     */
+    ReadGraphConnectionFilter readGraphConnectionCommand( final String 
connectionName );
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     */
+    ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String 
connectionName, final String entityType );
+
+
+    /**
+     * Read a connection directly between two identifiers
+     * @param connectionName
+     * @param targetId
+     * @return
+     */
+    ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter(final String 
connectionName, final Id targetId);
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     */
+    EntityLoadCollectorFilter entityLoadCollector();
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     */
+    QueryCollectionElasticSearchCollectorFilter 
queryCollectionElasticSearchCollector( final String collectionName, final 
String query);
+
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     */
+    QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector( final String connectionName,final String 
query);
+
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     */
+    QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector( final String connectionName, final 
String connectionEntityType, final String query );
+
+
+    /**
+     * Get an entity id filter.  Used as a 1.0->2.0 bridge since we're not 
doing full traversals
+     */
+    EntityIdFilter getEntityIdFilter( final Id entityId );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
new file mode 100644
index 0000000..19162bb
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryCollectionElasticSearchCollectorFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryConnectionElasticSearchCollectorFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter;
+import 
org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter;
+import org.apache.usergrid.persistence.Query;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class ReadFilterFactoryImpl implements ReadFilterFactory {
+
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+
+
+    @Inject
+    public ReadFilterFactoryImpl( final GraphManagerFactory 
graphManagerFactory,
+                                  final EntityIndexFactory entityIndexFactory,
+                                  final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
+
+
+        this.graphManagerFactory = graphManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    }
+
+
+    @Override
+    public ReadGraphCollectionFilter readGraphCollectionCommand( final String 
collectionName ) {
+        return new ReadGraphCollectionFilter( graphManagerFactory, 
collectionName );
+    }
+
+
+    @Override
+    public ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( final 
String collectionName,
+                                                                        final 
Id targetId ) {
+        return new ReadGraphCollectionByIdFilter( graphManagerFactory, 
collectionName, targetId );
+    }
+
+
+    @Override
+    public ReadGraphConnectionFilter readGraphConnectionCommand( final String 
connectionName ) {
+        return new ReadGraphConnectionFilter( graphManagerFactory, 
connectionName );
+    }
+
+
+    @Override
+    public ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final 
String connectionName,
+                                                                       final 
String entityType ) {
+        return new ReadGraphConnectionByTypeFilter( graphManagerFactory, 
connectionName, entityType );
+    }
+
+
+    @Override
+    public ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final 
String connectionName,
+                                                                        final 
Id targetId ) {
+        return new ReadGraphConnectionByIdFilter( graphManagerFactory, 
connectionName, targetId );
+    }
+
+
+    @Override
+    public EntityLoadCollectorFilter entityLoadCollector() {
+        return new EntityLoadCollectorFilter( entityCollectionManagerFactory );
+    }
+
+
+    /**
+     * TODO refactor these impls to use RX internally, as well as remove the 
query object
+     */
+    @Override
+    public QueryCollectionElasticSearchCollectorFilter 
queryCollectionElasticSearchCollector(
+        final String collectionName, final String query ) {
+
+        final Query queryObject = Query.fromQL( query );
+
+        final QueryCollectionElasticSearchCollectorFilter filter =
+            new QueryCollectionElasticSearchCollectorFilter( 
entityCollectionManagerFactory, entityIndexFactory,
+                collectionName, queryObject );
+
+        return filter;
+    }
+
+
+    @Override
+    public QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector(
+        final String connectionName, final String query ) {
+
+        final Query queryObject = Query.fromQL( query );
+
+        final QueryConnectionElasticSearchCollectorFilter filter =
+            new QueryConnectionElasticSearchCollectorFilter( 
entityCollectionManagerFactory, entityIndexFactory,
+                connectionName, queryObject );
+
+        return filter;
+    }
+
+
+    @Override
+    public QueryConnectionElasticSearchCollectorFilter 
queryConnectionElasticSearchCollector(
+        final String connectionName, final String connectionEntityType, final 
String query ) {
+
+        final Query queryObject = Query.fromQL( query );
+        queryObject.setConnectionType( connectionEntityType );
+
+        final QueryConnectionElasticSearchCollectorFilter filter =
+            new QueryConnectionElasticSearchCollectorFilter( 
entityCollectionManagerFactory, entityIndexFactory,
+                connectionName, queryObject );
+
+        return filter;
+    }
+
+
+    @Override
+    public EntityIdFilter getEntityIdFilter( final Id entityId ) {
+        return new EntityIdFilter( entityId );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
index d40cb12..5d83dac 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java
@@ -54,6 +54,7 @@ public interface ReadPipelineBuilder {
      */
     ReadPipelineBuilder setStartId(final Id id);
 
+
     /**
      * Add a get entity to the pipeline
      */
@@ -92,7 +93,7 @@ public interface ReadPipelineBuilder {
 
 
     /**
-     * Get all entities in a connection with a query
+     * Get all entities in a connection with a query and a target entity type
      */
     ReadPipelineBuilder connectionWithQuery( final String connectionName, 
final String entityType, final String query);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
index 7319f21..7ffe957 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java
@@ -39,7 +39,7 @@ import rx.Observable;
 public class ReadPipelineBuilderImpl implements ReadPipelineBuilder {
 
 
-    private final FilterFactory filterFactory;
+    private final ReadFilterFactory readFilterFactory;
 
     private final DataPipeline pipeline;
 
@@ -54,11 +54,16 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
 
     @Inject
-    public ReadPipelineBuilderImpl( final FilterFactory filterFactory,
+    public ReadPipelineBuilderImpl( final ReadFilterFactory readFilterFactory,
                                     @Assisted final ApplicationScope 
applicationScope ) {
-        this.filterFactory = filterFactory;
+        this.readFilterFactory = readFilterFactory;
+
+        //set up our pipeline with our application scope
         this.pipeline = new DataPipeline( applicationScope );
+
+        //init our cursor to empty
         this.cursor = Optional.absent();
+
         //set the default limit
         this.limit = Optional.absent();
     }
@@ -84,7 +89,7 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
     @Override
     public ReadPipelineBuilder setStartId( final Id id ) {
-        pipeline.withTraverseCommand( filterFactory.getEntityIdFilter( id ) );
+        pipeline.withTraverseCommand( readFilterFactory.getEntityIdFilter( id 
) );
 
         this.collectorFilter = null;
 
@@ -96,7 +101,7 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
     @Override
     public ReadPipelineBuilder getEntityViaCollection( final String 
collectionName, final Id entityId ) {
 
-        pipeline.withTraverseCommand( 
filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) );
+        pipeline.withTraverseCommand( 
readFilterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) );
 
         setEntityLoaderFilter();
 
@@ -108,7 +113,7 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
     public ReadPipelineBuilder getCollection( final String collectionName ) {
 
 
-        pipeline.withTraverseCommand( 
filterFactory.readGraphCollectionCommand( collectionName ) );
+        pipeline.withTraverseCommand( 
readFilterFactory.readGraphCollectionCommand( collectionName ) );
 
         setEntityLoaderFilter();
 
@@ -120,7 +125,7 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
     public ReadPipelineBuilder getCollectionWithQuery( final String 
collectionName, final String query ) {
 
         //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
-        collectorFilter = filterFactory.queryCollectionElasticSearchCollector( 
collectionName, query );
+        collectorFilter = 
readFilterFactory.queryCollectionElasticSearchCollector( collectionName, query 
);
         return this;
     }
 
@@ -128,7 +133,7 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
     @Override
     public ReadPipelineBuilder getEntityViaConnection( final String 
connectionName, final Id entityId ) {
 
-        pipeline.withTraverseCommand( 
filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) );
+        pipeline.withTraverseCommand( 
readFilterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) );
         setEntityLoaderFilter();
 
         return this;
@@ -138,7 +143,7 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
     @Override
     public ReadPipelineBuilder getConnection( final String connectionName ) {
 
-        pipeline.withTraverseCommand( 
filterFactory.readGraphConnectionCommand( connectionName ) );
+        pipeline.withTraverseCommand( 
readFilterFactory.readGraphConnectionCommand( connectionName ) );
         setEntityLoaderFilter();
 
         return this;
@@ -147,7 +152,7 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
     @Override
     public ReadPipelineBuilder getConnection( final String connectionName, 
final String entityType ) {
-        pipeline.withTraverseCommand( 
filterFactory.readGraphConnectionCommand( connectionName, entityType ) );
+        pipeline.withTraverseCommand( 
readFilterFactory.readGraphConnectionCommand( connectionName, entityType ) );
         setEntityLoaderFilter();
 
         return this;
@@ -164,7 +169,7 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
     public ReadPipelineBuilder connectionWithQuery( final String 
connectionName, final String query ) {
 
         //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
-        collectorFilter = filterFactory.queryConnectionElasticSearchCollector( 
connectionName, query );
+        collectorFilter = 
readFilterFactory.queryConnectionElasticSearchCollector( connectionName, query 
);
 
         return this;
     }
@@ -176,7 +181,7 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
         //TODO, this should really be 2 a TraverseFilter with an entityLoad 
collector
         collectorFilter =
-            filterFactory.queryConnectionElasticSearchCollector( 
connectionName, entityType, query);
+            readFilterFactory.queryConnectionElasticSearchCollector( 
connectionName, entityType, query);
         return this;
     }
 
@@ -192,6 +197,6 @@ public class ReadPipelineBuilderImpl implements 
ReadPipelineBuilder {
 
 
     private void setEntityLoaderFilter() {
-        collectorFilter = filterFactory.entityLoadCollector();
+        collectorFilter = readFilterFactory.entityLoadCollector();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
index 01ab21f..a843f51 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java
@@ -29,8 +29,8 @@ import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchType;
 import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -54,14 +54,14 @@ public abstract class 
AbstractQueryElasticSearchCollectorFilter extends Abstract
     implements CollectorFilter<Results> {
 
 
-    protected final ApplicationEntityIndex entityIndex;
+    protected final EntityIndexFactory applicationEntityIndex;
     protected final Query query;
     private int limit;
 
 
     @Inject
-    protected AbstractQueryElasticSearchCollectorFilter( final 
ApplicationEntityIndex entityIndex, final Query query ) {
-        this.entityIndex = entityIndex;
+    protected AbstractQueryElasticSearchCollectorFilter( final 
EntityIndexFactory applicationEntityIndex, final Query query ) {
+        this.applicationEntityIndex = applicationEntityIndex;
         this.query = query;
     }
 
@@ -69,6 +69,10 @@ public abstract class 
AbstractQueryElasticSearchCollectorFilter extends Abstract
     @Override
     public Observable<Results> call( final Observable<Id> idObservable ) {
 
+
+        final ApplicationEntityIndex
+            entityIndex = applicationEntityIndex.createApplicationEntityIndex( 
applicationScope );
+
         return idObservable.flatMap( id -> {
 
             //TODO, refactor this logic to use Observables.  make this a 
TraverseFilter and load entities with the entity loader collector
@@ -76,6 +80,8 @@ public abstract class 
AbstractQueryElasticSearchCollectorFilter extends Abstract
             final SearchEdge searchEdge = getSearchEdge( id );
             final SearchTypes searchTypes = getSearchTypes();
 
+
+
             final Iterable<Results> executor =
                 new ElasticSearchQueryExecutor( resultsLoaderFactory, 
entityIndex, applicationScope,
                     searchEdge, searchTypes, query.withLimit( limit ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java
index f0e8139..4813978 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java
@@ -25,7 +25,9 @@ import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.Resu
 import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -41,18 +43,18 @@ import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createColle
  */
 public class QueryCollectionElasticSearchCollectorFilter extends 
AbstractQueryElasticSearchCollectorFilter {
 
-    private final EntityCollectionManager entityCollectionManager;
-    private final ApplicationEntityIndex applicationEntityIndex;
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
     private final String collectionName;
 
 
     @Inject
-    protected QueryCollectionElasticSearchCollectorFilter( final 
EntityCollectionManager entityCollectionManager,
-                                                           final 
ApplicationEntityIndex applicationEntityIndex,
+    public QueryCollectionElasticSearchCollectorFilter( final 
EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                                           final 
EntityIndexFactory entityIndexFactory,
                                                            @Assisted final 
String collectionName , @Assisted final Query query ) {
-        super( applicationEntityIndex, query );
-        this.entityCollectionManager = entityCollectionManager;
-        this.applicationEntityIndex = applicationEntityIndex;
+        super( entityIndexFactory, query );
+        this.entityIndexFactory = entityIndexFactory;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.collectionName = collectionName;
     }
 
@@ -75,9 +77,11 @@ public class QueryCollectionElasticSearchCollectorFilter 
extends AbstractQueryEl
 
     @Override
     protected ResultsLoaderFactory getResultsLoaderFactory( final Id id ) {
+        final EntityCollectionManager entityCollectionManager = 
entityCollectionManagerFactory.createCollectionManager( applicationScope );
+        final ApplicationEntityIndex entityIndex = 
entityIndexFactory.createApplicationEntityIndex( applicationScope );
 
         final EntityRef entityRef = getRef( id );
-        return new ConnectionResultsLoaderFactoryImpl( 
entityCollectionManager, applicationEntityIndex, entityRef,
+        return new ConnectionResultsLoaderFactoryImpl( 
entityCollectionManager, entityIndex, entityRef,
             collectionName );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java
index 2b380df..2f7a6b3 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java
@@ -25,7 +25,9 @@ import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.Resu
 import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -41,19 +43,19 @@ import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createConne
  */
 public class QueryConnectionElasticSearchCollectorFilter extends 
AbstractQueryElasticSearchCollectorFilter {
 
-    private final EntityCollectionManager entityCollectionManager;
-    private final ApplicationEntityIndex applicationEntityIndex;
+    private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
     private final String connectionName;
 
 
     @Inject
-    protected QueryConnectionElasticSearchCollectorFilter( final 
EntityCollectionManager entityCollectionManager,
-                                                           final 
ApplicationEntityIndex applicationEntityIndex,
-                                                           @Assisted final 
String connectionName,
-                                                           @Assisted final 
Query query ) {
-        super( applicationEntityIndex, query );
-        this.entityCollectionManager = entityCollectionManager;
-        this.applicationEntityIndex = applicationEntityIndex;
+    public QueryConnectionElasticSearchCollectorFilter(
+        final EntityCollectionManagerFactory entityCollectionManagerFactory,
+        final EntityIndexFactory entityIndexFactory, @Assisted final String 
connectionName,
+        @Assisted final Query query ) {
+        super( entityIndexFactory, query );
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
         this.connectionName = connectionName;
     }
 
@@ -77,8 +79,13 @@ public class QueryConnectionElasticSearchCollectorFilter 
extends AbstractQueryEl
 
     @Override
     protected ResultsLoaderFactory getResultsLoaderFactory( final Id id ) {
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+        final ApplicationEntityIndex entityIndex = 
entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
         final EntityRef entityRef = getRef( id );
-        return new ConnectionResultsLoaderFactoryImpl( 
entityCollectionManager, applicationEntityIndex, entityRef,
+        return new ConnectionResultsLoaderFactoryImpl( 
entityCollectionManager, entityIndex, entityRef,
             connectionName );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
index 74f626d..3b140ae 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java
@@ -50,15 +50,12 @@ public class EntityLoadCollectorFilter extends 
AbstractFilter<Results, Serializa
     implements CollectorFilter<Results> {
 
     private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
-    private final ApplicationScope applicationScope;
     private int resultSize;
 
 
     @Inject
-    public EntityLoadCollectorFilter( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
-                                      final ApplicationScope applicationScope 
) {
+    public EntityLoadCollectorFilter( final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.applicationScope = applicationScope;
     }
 
 
@@ -92,7 +89,7 @@ public class EntityLoadCollectorFilter extends 
AbstractFilter<Results, Serializa
             final Observable<MvccEntity> mvccEntityObservable = 
Observable.from( entitySet.getEntities() );
 
             //convert them to our old entity model, then filter nulls, meaning 
they weren't found
-            return mvccEntityObservable.map( mvccEntity -> mapEntity( 
mvccEntity ) ).filter( entity -> entity == null )
+            return mvccEntityObservable.map( mvccEntity -> mapEntity( 
mvccEntity ) ).filter( entity -> entity != null )
 
                 //convert them to a list, then map them into results
                 .toList().map( entities -> {
@@ -100,9 +97,7 @@ public class EntityLoadCollectorFilter extends 
AbstractFilter<Results, Serializa
                     results.setCursor( generateCursor() );
 
                     return results;
-                } )
-                //if no results are present, return an empty results
-                .singleOrDefault( new Results(  ) );
+                } );
         } );
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 4bdcdad..06aae83 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -32,22 +32,18 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.base.Optional;
 
 import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
 
 
 /**
  * Command for reading graph edges
  */
-public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge>
-    implements TraverseFilter {
+public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> 
implements TraverseFilter {
 
     private final GraphManagerFactory graphManagerFactory;
 
 
     /**
      * Create a new instance of our command
-     * @param graphManagerFactory
      */
     public AbstractReadGraphFilter( final GraphManagerFactory 
graphManagerFactory ) {
         this.graphManagerFactory = graphManagerFactory;
@@ -67,20 +63,20 @@ public abstract class AbstractReadGraphFilter extends 
AbstractFilter<Id, Edge>
 
 
         //return all ids that are emitted from this edge
-        return observable.flatMap( new Func1<Id, Observable<Id>>() {
-
-            @Override
-            public Observable<Id> call( final Id id ) {
-
-                final SimpleSearchByEdgeType search = new 
SimpleSearchByEdgeType(id,edgeName, Long.MAX_VALUE,
-                    SearchByEdgeType.Order.DESCENDING, startFromCursor   );
-
-                /**
-                 * TODO, pass a message with pointers to our cursor values to 
be generated later
-                 */
-                return graphManager.loadEdgesFromSource( search ).doOnNext( 
edge -> setCursor( edge ) ).map(
-                    edge -> edge.getTargetNode() );
-            }
+        return observable.flatMap( id -> {
+
+            final SimpleSearchByEdgeType search =
+                new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, 
SearchByEdgeType.Order.DESCENDING,
+                    startFromCursor );
+
+            /**
+             * TODO, pass a message with pointers to our cursor values to be 
generated later
+             */
+            return graphManager.loadEdgesFromSource( search )
+                //set our cursor every edge we traverse
+                .doOnNext( edge -> setCursor( edge ) )
+                    //map our id from the target edge
+                .map( edge -> edge.getTargetNode() );
         } );
     }
 
@@ -91,10 +87,8 @@ public abstract class AbstractReadGraphFilter extends 
AbstractFilter<Id, Edge>
     }
 
 
-
     /**
      * Get the edge type name we should use when traversing
-     * @return
      */
     protected abstract String getEdgeTypeName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
index 2ccad67..da6ad29 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
@@ -24,6 +24,7 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
 
@@ -34,6 +35,7 @@ public class ReadGraphCollectionByIdFilter extends 
AbstractReadGraphEdgeByIdFilt
 
     private final String collectionName;
 
+    @Inject
     public ReadGraphCollectionByIdFilter( final GraphManagerFactory 
graphManagerFactory, @Assisted final String collectionName, @Assisted final Id 
targetId ) {
         super( graphManagerFactory, targetId );
         this.collectionName = collectionName;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
index 3f0a70a..91ae7c3 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
@@ -25,7 +25,7 @@ import 
org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
-import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromCollectionName;
 
 
 /**
@@ -48,6 +48,6 @@ public class ReadGraphCollectionFilter extends 
AbstractReadGraphFilter {
 
     @Override
     protected String getEdgeTypeName() {
-        return getCollectionScopeNameFromCollectionName( collectionName );
+        return getEdgeTypeFromCollectionName( collectionName );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
index f16867d..4756d33 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
@@ -24,6 +24,7 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
 
@@ -34,6 +35,7 @@ public class ReadGraphConnectionByIdFilter extends 
AbstractReadGraphEdgeByIdFilt
 
     private final String connectionName;
 
+    @Inject
     public ReadGraphConnectionByIdFilter( final GraphManagerFactory 
graphManagerFactory,
                                           @Assisted final String 
connectionName, @Assisted final Id targetId ) {
         super( graphManagerFactory, targetId );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
index 3057111..d5cdd66 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
@@ -30,13 +30,12 @@ import 
org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
+import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
 import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
 
-import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
 
 
 /**
@@ -52,6 +51,7 @@ public class ReadGraphConnectionByTypeFilter extends 
AbstractFilter<Id, Edge> im
     /**
      * Create a new instance of our command
      */
+    @Inject
     public ReadGraphConnectionByTypeFilter( final GraphManagerFactory 
graphManagerFactory,
                                             @Assisted final String 
connectionName, @Assisted final String entityType ) {
         this.graphManagerFactory = graphManagerFactory;
@@ -69,7 +69,7 @@ public class ReadGraphConnectionByTypeFilter extends 
AbstractFilter<Id, Edge> im
         //set our our constant state
         final Optional<Edge> startFromCursor = getCursor();
 
-        final String edgeName = getConnectionScopeName( connectionName );
+        final String edgeName = getEdgeTypeFromConnectionType( connectionName 
);
 
 
         //return all ids that are emitted from this edge

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
index 49360f6..0d4971b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
@@ -25,7 +25,7 @@ import 
org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
-import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
 
 
 /**
@@ -48,6 +48,6 @@ public class ReadGraphConnectionFilter extends 
AbstractReadGraphFilter {
 
     @Override
     protected String getEdgeTypeName() {
-        return getConnectionScopeName( connectionName );
+        return getEdgeTypeFromConnectionType( connectionName );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index 382fb64..f5a2893 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -40,7 +40,8 @@ public class ObservableQueryExecutor implements QueryExecutor 
{
 
 
     public ObservableQueryExecutor( final Observable<Results> 
resultsObservable ) {
-        this.resultsObservable = resultsObservable;
+        //in no values,  we must emit an empty results so add the default
+        this.resultsObservable = resultsObservable.defaultIfEmpty( new 
Results() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 5472071..c65db09 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -236,10 +236,10 @@ public class CpNamingUtils {
 
 
         if ( isCollectionEdgeType( edgeName ) ) {
-            return getCollectionScopeNameFromCollectionName( 
getCollectionName( edgeName ) );
+            return getCollectionName( edgeName ) ;
         }
 
-        return getConnectionScopeName( getConnectionType( edgeName ) );
+        return getConnectionType( edgeName ) ;
     }
 
 
@@ -260,20 +260,4 @@ public class CpNamingUtils {
     }
 
 
-    /**
-     * Only use when searching graph.  When searching ES use the 
<see>createCollectionSearchEdge</see>
-     */
-    public static String getCollectionScopeNameFromCollectionName( String name 
) {
-        String csn = EDGE_COLL_SUFFIX + name;
-        return csn.toLowerCase();
-    }
-
-
-    /**
-     * Only use when searching graph.  When searching ES use the 
<see>createConnectionSearchByEdge</see>
-     */
-    public static String getConnectionScopeName( String connectionType ) {
-        String csn = EDGE_CONN_SUFFIX + connectionType;
-        return csn.toLowerCase();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
index c81fa58..0239f4b 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
@@ -164,7 +164,7 @@ public class EdgeTestUtils {
      */
     public static SearchByIdType createSearchByEdgeAndId( final Id sourceId, 
final String type, final long maxVersion,
                                                           final String idType, 
final Edge last ) {
-        return new SimpleSearchByIdType( sourceId, type, maxVersion, 
SearchByEdgeType.Order.DESCENDING, idType, last );
+        return new SimpleSearchByIdType( sourceId, type, maxVersion, 
SearchByEdgeType.Order.DESCENDING, idType, Optional.of(last) );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
----------------------------------------------------------------------
diff --git 
a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
 
b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
index 7cb5073..214334b 100644
--- 
a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
+++ 
b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
@@ -43,8 +43,10 @@ public class ConcurrentProcessSingleton {
     public static final int LOCK_PORT = Integer.parseInt(
         System.getProperty( "test.lock.port", "10101" ) );
 
-    public static final boolean CLEAN_STORAGE =
-        Boolean.parseBoolean( System.getProperty( "test.clean.storage", 
"false" ) );
+//    public static final boolean CLEAN_STORAGE =
+//        Boolean.parseBoolean( System.getProperty( "test.clean.storage", 
"false" ) );
+
+    public static final boolean CLEAN_STORAGE = false;
 
 
     public static final long ONE_MINUTE = 60000;

Reply via email to