Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-593 9dc65dc8c -> 23acdd509
Query by edges collection working. ES Query by collection not working Cursor generation not implemented Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/23acdd50 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/23acdd50 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/23acdd50 Branch: refs/heads/USERGRID-593 Commit: 23acdd509ffd5b423d7cdf5c58a16ca062ab9e5a Parents: 9dc65dc Author: Todd Nine <tn...@apigee.com> Authored: Mon Apr 27 10:38:06 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Mon Apr 27 10:38:06 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 4 + .../corepersistence/CpEntityManager.java | 15 +- .../corepersistence/CpEntityManagerFactory.java | 11 +- .../corepersistence/CpRelationManager.java | 11 +- .../pipeline/PipelineModule.java | 17 ++- .../corepersistence/pipeline/read/Filter.java | 6 +- .../pipeline/read/FilterFactory.java | 102 ------------- .../pipeline/read/ReadFilterFactory.java | 102 +++++++++++++ .../pipeline/read/ReadFilterFactoryImpl.java | 152 +++++++++++++++++++ .../pipeline/read/ReadPipelineBuilder.java | 3 +- .../pipeline/read/ReadPipelineBuilderImpl.java | 31 ++-- ...stractQueryElasticSearchCollectorFilter.java | 14 +- ...yCollectionElasticSearchCollectorFilter.java | 20 ++- ...yConnectionElasticSearchCollectorFilter.java | 27 ++-- .../read/entity/EntityLoadCollectorFilter.java | 11 +- .../read/graph/AbstractReadGraphFilter.java | 36 ++--- .../graph/ReadGraphCollectionByIdFilter.java | 2 + .../read/graph/ReadGraphCollectionFilter.java | 4 +- .../graph/ReadGraphConnectionByIdFilter.java | 2 + .../graph/ReadGraphConnectionByTypeFilter.java | 8 +- .../read/graph/ReadGraphConnectionFilter.java | 4 +- .../results/ObservableQueryExecutor.java | 3 +- .../corepersistence/util/CpNamingUtils.java | 20 +-- .../graph/test/util/EdgeTestUtils.java | 2 +- .../setup/ConcurrentProcessSingleton.java | 6 +- 25 files changed, 390 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index 06fe058..ca20e23 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -31,6 +31,7 @@ import org.apache.usergrid.corepersistence.migration.CoreMigration; import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin; import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration; import org.apache.usergrid.corepersistence.migration.MigrationModuleVersionPlugin; +import org.apache.usergrid.corepersistence.pipeline.PipelineModule; import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl; import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl; @@ -158,6 +159,9 @@ public class CoreModule extends AbstractModule { install( new GuicyFigModule( ApplicationIdCacheFig.class ) ); + //install our pipeline modules + install(new PipelineModule()); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 748a069..2d419b6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.index.AsyncIndexService; +import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.AggregateCounter; @@ -179,9 +180,7 @@ public class CpEntityManager implements EntityManager { private final AsyncIndexService indexService; - - private EntityCollectionManagerFactory entityCollectionManagerFactory; - private GraphManagerFactory graphManagerFactory; + private PipelineBuilderFactory pipelineBuilderFactory; private boolean skipAggregateCounters; private MetricsFactory metricsFactory; @@ -224,16 +223,16 @@ public class CpEntityManager implements EntityManager { * @param graphManagerFactory */ public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncIndexService indexService, final ManagerCache managerCache, - final MetricsFactory metricsFactory,final EntityCollectionManagerFactory entityCollectionManagerFactory, - final GraphManagerFactory graphManagerFactory , final UUID applicationId ) { - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.graphManagerFactory = graphManagerFactory; + final MetricsFactory metricsFactory,final PipelineBuilderFactory pipelineBuilderFactory , final UUID applicationId ) { + Preconditions.checkNotNull( cass, "cass must not be null" ); Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" ); Preconditions.checkNotNull( managerCache, "managerCache must not be null" ); Preconditions.checkNotNull( applicationId, "applicationId must not be null" ); Preconditions.checkNotNull( indexService, "indexService must not be null" ); + Preconditions.checkNotNull( pipelineBuilderFactory, "pipelineBuilderFactory must not be null" ); + this.pipelineBuilderFactory = pipelineBuilderFactory; this.managerCache = managerCache; @@ -747,7 +746,7 @@ public class CpEntityManager implements EntityManager { Preconditions.checkNotNull( entityRef, "entityRef cannot be null" ); CpRelationManager relationManager = - new CpRelationManager( metricsFactory, managerCache, entityCollectionManagerFactory, graphManagerFactory, indexService, this, applicationId, entityRef ); + new CpRelationManager( metricsFactory, managerCache, pipelineBuilderFactory, indexService, this, applicationId, entityRef ); return relationManager; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index f06cf40..0d10fd2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -35,6 +35,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.usergrid.corepersistence.index.AsyncIndexService; import org.apache.usergrid.corepersistence.index.ReIndexService; +import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.exception.ConflictException; import org.apache.usergrid.persistence.AbstractEntity; @@ -74,6 +75,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.utils.UUIDUtils; +import com.amazonaws.services.elastictranscoder.model.Pipeline; import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -118,8 +120,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private final ApplicationIdCache applicationIdCache; private ManagerCache managerCache; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final GraphManagerFactory graphManagerFactory; private CassandraService cassandraService; private CounterUtils counterUtils; @@ -127,6 +127,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private final EntityIndex entityIndex; private final MetricsFactory metricsFactory; private final AsyncIndexService indexService; + private final PipelineBuilderFactory pipelineBuilderFactory; public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils, final Injector injector ) { @@ -139,10 +140,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.managerCache = injector.getInstance( ManagerCache.class ); this.metricsFactory = injector.getInstance( MetricsFactory.class ); this.indexService = injector.getInstance( AsyncIndexService.class ); + this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class ); this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance( getManagementEntityManager() ); - this.entityCollectionManagerFactory = injector.getInstance( EntityCollectionManagerFactory.class ); - this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class ); } @@ -199,8 +199,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private EntityManager _getEntityManager( UUID applicationId ) { - EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, - entityCollectionManagerFactory, graphManagerFactory, applicationId ); + EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, pipelineBuilderFactory, applicationId ); return em; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index af926bf..e7ef0ff 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -109,9 +109,7 @@ public class CpRelationManager implements RelationManager { private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class ); private ManagerCache managerCache; - private EntityCollectionManagerFactory entityCollectionManagerFactory; - private GraphManagerFactory graphManagerFactory; - private PipelineBuilderFactory pipelineBuilderFactory; + private final PipelineBuilderFactory pipelineBuilderFactory; private EntityManager em; @@ -130,8 +128,7 @@ public class CpRelationManager implements RelationManager { public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, - final EntityCollectionManagerFactory entityCollectionManagerFactory, - final GraphManagerFactory graphManagerFactory, final AsyncIndexService indexService, + final PipelineBuilderFactory pipelineBuilderFactory, final AsyncIndexService indexService, final EntityManager em, final UUID applicationId, final EntityRef headEntity ) { @@ -148,9 +145,7 @@ public class CpRelationManager implements RelationManager { this.headEntity = headEntity; this.managerCache = managerCache; this.applicationScope = CpNamingUtils.getApplicationScope( applicationId ); - - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.graphManagerFactory = graphManagerFactory; + this.pipelineBuilderFactory = pipelineBuilderFactory; this.metricsFactory = metricsFactory; this.updateCollectionTimer = http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java index 8afe2f8..55b84af 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java @@ -20,7 +20,10 @@ package org.apache.usergrid.corepersistence.pipeline; +import org.apache.usergrid.corepersistence.pipeline.read.ReadFilterFactory; +import org.apache.usergrid.corepersistence.pipeline.read.ReadFilterFactoryImpl; import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder; +import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilderImpl; import com.google.inject.AbstractModule; import com.google.inject.assistedinject.FactoryModuleBuilder; @@ -35,6 +38,18 @@ public class PipelineModule extends AbstractModule { protected void configure() { //Use Guice to create the builder since we don't really need to do anything //other than DI when creating the filters - install(new FactoryModuleBuilder().build( ReadPipelineBuilder.class )); + bind( ReadFilterFactory.class ).to( ReadFilterFactoryImpl.class ); + + + //Use Guice to create the builder since we don't really need to do anything + //other than DI when creating the filters + install( new FactoryModuleBuilder().implement( ReadPipelineBuilder.class, ReadPipelineBuilderImpl.class ) + .build( PipelineBuilderFactory.class ) ); + + + + //Use Guice to create the builder since we don't really need to do anything + //other than DI when creating the filters +// install( new FactoryModuleBuilder().build( ReadFilterFactory.class ) ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java index 670d2ed..f50a2f4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java @@ -37,12 +37,12 @@ public interface Filter<T> extends Observable.Transformer<Id, T> { /** - * Set the id of this command in it's execution environment + * Set the id of this filter in it's execution environment */ void setId( final int id ); /** - * Set the cursor cache into the command + * Set the cursor cache into the filter * * @param readCache Set the cache that was used in the request * @param writeCache Set the cache to be used when writing the results @@ -50,7 +50,7 @@ public interface Filter<T> extends Observable.Transformer<Id, T> { void setCursorCaches( final RequestCursor readCache, final ResponseCursor writeCache ); /** - * Set the application scope of the command + * Set the application scope of the filter * @param applicationScope */ void setApplicationScope(final ApplicationScope applicationScope); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java deleted file mode 100644 index 8f9776e..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryCollectionElasticSearchCollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryConnectionElasticSearchCollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; - - -/** - * A factory for generating read commands - */ -public interface FilterFactory { - - - /** - * Generate a new instance of the command with the specified parameters - */ - ReadGraphCollectionFilter readGraphCollectionCommand( final String collectionName ); - - /** - * Read a connection between two entities, the incoming and the target entity - * @param collectionName - * @param targetId - * @return - */ - ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter(final String collectionName, final Id targetId); - - /** - * Generate a new instance of the command with the specified parameters - */ - ReadGraphConnectionFilter readGraphConnectionCommand( final String connectionName ); - - /** - * Generate a new instance of the command with the specified parameters - */ - ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String connectionName, final String entityType ); - - - /** - * Read a connection directly between two identifiers - * @param connectionName - * @param targetId - * @return - */ - ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter(final String connectionName, final Id targetId); - - /** - * Generate a new instance of the command with the specified parameters - */ - EntityLoadCollectorFilter entityLoadCollector(); - - /** - * Generate a new instance of the command with the specified parameters - */ - QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector( final String collectionName, final String query); - - - /** - * Generate a new instance of the command with the specified parameters - */ - QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( final String connectionName,final String query); - - - /** - * Generate a new instance of the command with the specified parameters - */ - QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( final String connectionName, final String connectionEntityType, final String query ); - - - /** - * Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals - */ - EntityIdFilter getEntityIdFilter( final Id entityId ); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java new file mode 100644 index 0000000..92bdacb --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactory.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryCollectionElasticSearchCollectorFilter; +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryConnectionElasticSearchCollectorFilter; +import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + + +/** + * A factory for generating read commands + */ +public interface ReadFilterFactory { + + + /** + * Generate a new instance of the command with the specified parameters + */ + ReadGraphCollectionFilter readGraphCollectionCommand( final String collectionName ); + + /** + * Read a connection between two entities, the incoming and the target entity + * @param collectionName + * @param targetId + * @return + */ + ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter(final String collectionName, final Id targetId); + + /** + * Generate a new instance of the command with the specified parameters + */ + ReadGraphConnectionFilter readGraphConnectionCommand( final String connectionName ); + + /** + * Generate a new instance of the command with the specified parameters + */ + ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String connectionName, final String entityType ); + + + /** + * Read a connection directly between two identifiers + * @param connectionName + * @param targetId + * @return + */ + ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter(final String connectionName, final Id targetId); + + /** + * Generate a new instance of the command with the specified parameters + */ + EntityLoadCollectorFilter entityLoadCollector(); + + /** + * Generate a new instance of the command with the specified parameters + */ + QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector( final String collectionName, final String query); + + + /** + * Generate a new instance of the command with the specified parameters + */ + QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( final String connectionName,final String query); + + + /** + * Generate a new instance of the command with the specified parameters + */ + QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( final String connectionName, final String connectionEntityType, final String query ); + + + /** + * Get an entity id filter. Used as a 1.0->2.0 bridge since we're not doing full traversals + */ + EntityIdFilter getEntityIdFilter( final Id entityId ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java new file mode 100644 index 0000000..19162bb --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadFilterFactoryImpl.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryCollectionElasticSearchCollectorFilter; +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.QueryConnectionElasticSearchCollectorFilter; +import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionByIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByIdFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionByTypeFilter; +import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + + +@Singleton +public class ReadFilterFactoryImpl implements ReadFilterFactory { + + + private final GraphManagerFactory graphManagerFactory; + private final EntityIndexFactory entityIndexFactory; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + + @Inject + public ReadFilterFactoryImpl( final GraphManagerFactory graphManagerFactory, + final EntityIndexFactory entityIndexFactory, + final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + + + this.graphManagerFactory = graphManagerFactory; + this.entityIndexFactory = entityIndexFactory; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + } + + + @Override + public ReadGraphCollectionFilter readGraphCollectionCommand( final String collectionName ) { + return new ReadGraphCollectionFilter( graphManagerFactory, collectionName ); + } + + + @Override + public ReadGraphCollectionByIdFilter readGraphCollectionByIdFilter( final String collectionName, + final Id targetId ) { + return new ReadGraphCollectionByIdFilter( graphManagerFactory, collectionName, targetId ); + } + + + @Override + public ReadGraphConnectionFilter readGraphConnectionCommand( final String connectionName ) { + return new ReadGraphConnectionFilter( graphManagerFactory, connectionName ); + } + + + @Override + public ReadGraphConnectionByTypeFilter readGraphConnectionCommand( final String connectionName, + final String entityType ) { + return new ReadGraphConnectionByTypeFilter( graphManagerFactory, connectionName, entityType ); + } + + + @Override + public ReadGraphConnectionByIdFilter readGraphConnectionByIdFilter( final String connectionName, + final Id targetId ) { + return new ReadGraphConnectionByIdFilter( graphManagerFactory, connectionName, targetId ); + } + + + @Override + public EntityLoadCollectorFilter entityLoadCollector() { + return new EntityLoadCollectorFilter( entityCollectionManagerFactory ); + } + + + /** + * TODO refactor these impls to use RX internally, as well as remove the query object + */ + @Override + public QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector( + final String collectionName, final String query ) { + + final Query queryObject = Query.fromQL( query ); + + final QueryCollectionElasticSearchCollectorFilter filter = + new QueryCollectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, + collectionName, queryObject ); + + return filter; + } + + + @Override + public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( + final String connectionName, final String query ) { + + final Query queryObject = Query.fromQL( query ); + + final QueryConnectionElasticSearchCollectorFilter filter = + new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, + connectionName, queryObject ); + + return filter; + } + + + @Override + public QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( + final String connectionName, final String connectionEntityType, final String query ) { + + final Query queryObject = Query.fromQL( query ); + queryObject.setConnectionType( connectionEntityType ); + + final QueryConnectionElasticSearchCollectorFilter filter = + new QueryConnectionElasticSearchCollectorFilter( entityCollectionManagerFactory, entityIndexFactory, + connectionName, queryObject ); + + return filter; + } + + + @Override + public EntityIdFilter getEntityIdFilter( final Id entityId ) { + return new EntityIdFilter( entityId ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java index d40cb12..5d83dac 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java @@ -54,6 +54,7 @@ public interface ReadPipelineBuilder { */ ReadPipelineBuilder setStartId(final Id id); + /** * Add a get entity to the pipeline */ @@ -92,7 +93,7 @@ public interface ReadPipelineBuilder { /** - * Get all entities in a connection with a query + * Get all entities in a connection with a query and a target entity type */ ReadPipelineBuilder connectionWithQuery( final String connectionName, final String entityType, final String query); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java index 7319f21..7ffe957 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java @@ -39,7 +39,7 @@ import rx.Observable; public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { - private final FilterFactory filterFactory; + private final ReadFilterFactory readFilterFactory; private final DataPipeline pipeline; @@ -54,11 +54,16 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Inject - public ReadPipelineBuilderImpl( final FilterFactory filterFactory, + public ReadPipelineBuilderImpl( final ReadFilterFactory readFilterFactory, @Assisted final ApplicationScope applicationScope ) { - this.filterFactory = filterFactory; + this.readFilterFactory = readFilterFactory; + + //set up our pipeline with our application scope this.pipeline = new DataPipeline( applicationScope ); + + //init our cursor to empty this.cursor = Optional.absent(); + //set the default limit this.limit = Optional.absent(); } @@ -84,7 +89,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder setStartId( final Id id ) { - pipeline.withTraverseCommand( filterFactory.getEntityIdFilter( id ) ); + pipeline.withTraverseCommand( readFilterFactory.getEntityIdFilter( id ) ); this.collectorFilter = null; @@ -96,7 +101,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder getEntityViaCollection( final String collectionName, final Id entityId ) { - pipeline.withTraverseCommand( filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) ); + pipeline.withTraverseCommand( readFilterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) ); setEntityLoaderFilter(); @@ -108,7 +113,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { public ReadPipelineBuilder getCollection( final String collectionName ) { - pipeline.withTraverseCommand( filterFactory.readGraphCollectionCommand( collectionName ) ); + pipeline.withTraverseCommand( readFilterFactory.readGraphCollectionCommand( collectionName ) ); setEntityLoaderFilter(); @@ -120,7 +125,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { public ReadPipelineBuilder getCollectionWithQuery( final String collectionName, final String query ) { //TODO, this should really be 2 a TraverseFilter with an entityLoad collector - collectorFilter = filterFactory.queryCollectionElasticSearchCollector( collectionName, query ); + collectorFilter = readFilterFactory.queryCollectionElasticSearchCollector( collectionName, query ); return this; } @@ -128,7 +133,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder getEntityViaConnection( final String connectionName, final Id entityId ) { - pipeline.withTraverseCommand( filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) ); + pipeline.withTraverseCommand( readFilterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) ); setEntityLoaderFilter(); return this; @@ -138,7 +143,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder getConnection( final String connectionName ) { - pipeline.withTraverseCommand( filterFactory.readGraphConnectionCommand( connectionName ) ); + pipeline.withTraverseCommand( readFilterFactory.readGraphConnectionCommand( connectionName ) ); setEntityLoaderFilter(); return this; @@ -147,7 +152,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder getConnection( final String connectionName, final String entityType ) { - pipeline.withTraverseCommand( filterFactory.readGraphConnectionCommand( connectionName, entityType ) ); + pipeline.withTraverseCommand( readFilterFactory.readGraphConnectionCommand( connectionName, entityType ) ); setEntityLoaderFilter(); return this; @@ -164,7 +169,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { public ReadPipelineBuilder connectionWithQuery( final String connectionName, final String query ) { //TODO, this should really be 2 a TraverseFilter with an entityLoad collector - collectorFilter = filterFactory.queryConnectionElasticSearchCollector( connectionName, query ); + collectorFilter = readFilterFactory.queryConnectionElasticSearchCollector( connectionName, query ); return this; } @@ -176,7 +181,7 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { //TODO, this should really be 2 a TraverseFilter with an entityLoad collector collectorFilter = - filterFactory.queryConnectionElasticSearchCollector( connectionName, entityType, query); + readFilterFactory.queryConnectionElasticSearchCollector( connectionName, entityType, query); return this; } @@ -192,6 +197,6 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { private void setEntityLoaderFilter() { - collectorFilter = filterFactory.entityLoadCollector(); + collectorFilter = readFilterFactory.entityLoadCollector(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java index 01ab21f..a843f51 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java @@ -29,8 +29,8 @@ import org.apache.usergrid.persistence.Query; import org.apache.usergrid.persistence.Results; import org.apache.usergrid.persistence.SimpleEntityRef; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchType; import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.model.entity.Id; @@ -54,14 +54,14 @@ public abstract class AbstractQueryElasticSearchCollectorFilter extends Abstract implements CollectorFilter<Results> { - protected final ApplicationEntityIndex entityIndex; + protected final EntityIndexFactory applicationEntityIndex; protected final Query query; private int limit; @Inject - protected AbstractQueryElasticSearchCollectorFilter( final ApplicationEntityIndex entityIndex, final Query query ) { - this.entityIndex = entityIndex; + protected AbstractQueryElasticSearchCollectorFilter( final EntityIndexFactory applicationEntityIndex, final Query query ) { + this.applicationEntityIndex = applicationEntityIndex; this.query = query; } @@ -69,6 +69,10 @@ public abstract class AbstractQueryElasticSearchCollectorFilter extends Abstract @Override public Observable<Results> call( final Observable<Id> idObservable ) { + + final ApplicationEntityIndex + entityIndex = applicationEntityIndex.createApplicationEntityIndex( applicationScope ); + return idObservable.flatMap( id -> { //TODO, refactor this logic to use Observables. make this a TraverseFilter and load entities with the entity loader collector @@ -76,6 +80,8 @@ public abstract class AbstractQueryElasticSearchCollectorFilter extends Abstract final SearchEdge searchEdge = getSearchEdge( id ); final SearchTypes searchTypes = getSearchTypes(); + + final Iterable<Results> executor = new ElasticSearchQueryExecutor( resultsLoaderFactory, entityIndex, applicationScope, searchEdge, searchTypes, query.withLimit( limit ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java index f0e8139..4813978 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java @@ -25,7 +25,9 @@ import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.Resu import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.Query; import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.SearchEdge; import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.model.entity.Id; @@ -41,18 +43,18 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createColle */ public class QueryCollectionElasticSearchCollectorFilter extends AbstractQueryElasticSearchCollectorFilter { - private final EntityCollectionManager entityCollectionManager; - private final ApplicationEntityIndex applicationEntityIndex; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final EntityIndexFactory entityIndexFactory; private final String collectionName; @Inject - protected QueryCollectionElasticSearchCollectorFilter( final EntityCollectionManager entityCollectionManager, - final ApplicationEntityIndex applicationEntityIndex, + public QueryCollectionElasticSearchCollectorFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final EntityIndexFactory entityIndexFactory, @Assisted final String collectionName , @Assisted final Query query ) { - super( applicationEntityIndex, query ); - this.entityCollectionManager = entityCollectionManager; - this.applicationEntityIndex = applicationEntityIndex; + super( entityIndexFactory, query ); + this.entityIndexFactory = entityIndexFactory; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; this.collectionName = collectionName; } @@ -75,9 +77,11 @@ public class QueryCollectionElasticSearchCollectorFilter extends AbstractQueryEl @Override protected ResultsLoaderFactory getResultsLoaderFactory( final Id id ) { + final EntityCollectionManager entityCollectionManager = entityCollectionManagerFactory.createCollectionManager( applicationScope ); + final ApplicationEntityIndex entityIndex = entityIndexFactory.createApplicationEntityIndex( applicationScope ); final EntityRef entityRef = getRef( id ); - return new ConnectionResultsLoaderFactoryImpl( entityCollectionManager, applicationEntityIndex, entityRef, + return new ConnectionResultsLoaderFactoryImpl( entityCollectionManager, entityIndex, entityRef, collectionName ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java index 2b380df..2f7a6b3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java @@ -25,7 +25,9 @@ import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.Resu import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.Query; import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.SearchEdge; import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.model.entity.Id; @@ -41,19 +43,19 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConne */ public class QueryConnectionElasticSearchCollectorFilter extends AbstractQueryElasticSearchCollectorFilter { - private final EntityCollectionManager entityCollectionManager; - private final ApplicationEntityIndex applicationEntityIndex; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final EntityIndexFactory entityIndexFactory; private final String connectionName; @Inject - protected QueryConnectionElasticSearchCollectorFilter( final EntityCollectionManager entityCollectionManager, - final ApplicationEntityIndex applicationEntityIndex, - @Assisted final String connectionName, - @Assisted final Query query ) { - super( applicationEntityIndex, query ); - this.entityCollectionManager = entityCollectionManager; - this.applicationEntityIndex = applicationEntityIndex; + public QueryConnectionElasticSearchCollectorFilter( + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final EntityIndexFactory entityIndexFactory, @Assisted final String connectionName, + @Assisted final Query query ) { + super( entityIndexFactory, query ); + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.entityIndexFactory = entityIndexFactory; this.connectionName = connectionName; } @@ -77,8 +79,13 @@ public class QueryConnectionElasticSearchCollectorFilter extends AbstractQueryEl @Override protected ResultsLoaderFactory getResultsLoaderFactory( final Id id ) { + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + final ApplicationEntityIndex entityIndex = entityIndexFactory.createApplicationEntityIndex( applicationScope ); + final EntityRef entityRef = getRef( id ); - return new ConnectionResultsLoaderFactoryImpl( entityCollectionManager, applicationEntityIndex, entityRef, + return new ConnectionResultsLoaderFactoryImpl( entityCollectionManager, entityIndex, entityRef, connectionName ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java index 74f626d..3b140ae 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java @@ -50,15 +50,12 @@ public class EntityLoadCollectorFilter extends AbstractFilter<Results, Serializa implements CollectorFilter<Results> { private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final ApplicationScope applicationScope; private int resultSize; @Inject - public EntityLoadCollectorFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final ApplicationScope applicationScope ) { + public EntityLoadCollectorFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) { this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.applicationScope = applicationScope; } @@ -92,7 +89,7 @@ public class EntityLoadCollectorFilter extends AbstractFilter<Results, Serializa final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() ); //convert them to our old entity model, then filter nulls, meaning they weren't found - return mvccEntityObservable.map( mvccEntity -> mapEntity( mvccEntity ) ).filter( entity -> entity == null ) + return mvccEntityObservable.map( mvccEntity -> mapEntity( mvccEntity ) ).filter( entity -> entity != null ) //convert them to a list, then map them into results .toList().map( entities -> { @@ -100,9 +97,7 @@ public class EntityLoadCollectorFilter extends AbstractFilter<Results, Serializa results.setCursor( generateCursor() ); return results; - } ) - //if no results are present, return an empty results - .singleOrDefault( new Results( ) ); + } ); } ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java index 4bdcdad..06aae83 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java @@ -32,22 +32,18 @@ import org.apache.usergrid.persistence.model.entity.Id; import com.google.common.base.Optional; import rx.Observable; -import rx.functions.Action1; -import rx.functions.Func1; /** * Command for reading graph edges */ -public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> - implements TraverseFilter { +public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> implements TraverseFilter { private final GraphManagerFactory graphManagerFactory; /** * Create a new instance of our command - * @param graphManagerFactory */ public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) { this.graphManagerFactory = graphManagerFactory; @@ -67,20 +63,20 @@ public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> //return all ids that are emitted from this edge - return observable.flatMap( new Func1<Id, Observable<Id>>() { - - @Override - public Observable<Id> call( final Id id ) { - - final SimpleSearchByEdgeType search = new SimpleSearchByEdgeType(id,edgeName, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, startFromCursor ); - - /** - * TODO, pass a message with pointers to our cursor values to be generated later - */ - return graphManager.loadEdgesFromSource( search ).doOnNext( edge -> setCursor( edge ) ).map( - edge -> edge.getTargetNode() ); - } + return observable.flatMap( id -> { + + final SimpleSearchByEdgeType search = + new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + startFromCursor ); + + /** + * TODO, pass a message with pointers to our cursor values to be generated later + */ + return graphManager.loadEdgesFromSource( search ) + //set our cursor every edge we traverse + .doOnNext( edge -> setCursor( edge ) ) + //map our id from the target edge + .map( edge -> edge.getTargetNode() ); } ); } @@ -91,10 +87,8 @@ public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> } - /** * Get the edge type name we should use when traversing - * @return */ protected abstract String getEdgeTypeName(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java index 2ccad67..da6ad29 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java @@ -24,6 +24,7 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; @@ -34,6 +35,7 @@ public class ReadGraphCollectionByIdFilter extends AbstractReadGraphEdgeByIdFilt private final String collectionName; + @Inject public ReadGraphCollectionByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName, @Assisted final Id targetId ) { super( graphManagerFactory, targetId ); this.collectionName = collectionName; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java index 3f0a70a..91ae7c3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java @@ -25,7 +25,7 @@ import org.apache.usergrid.persistence.graph.GraphManagerFactory; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromCollectionName; /** @@ -48,6 +48,6 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter { @Override protected String getEdgeTypeName() { - return getCollectionScopeNameFromCollectionName( collectionName ); + return getEdgeTypeFromCollectionName( collectionName ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java index f16867d..4756d33 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java @@ -24,6 +24,7 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; @@ -34,6 +35,7 @@ public class ReadGraphConnectionByIdFilter extends AbstractReadGraphEdgeByIdFilt private final String connectionName; + @Inject public ReadGraphConnectionByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName, @Assisted final Id targetId ) { super( graphManagerFactory, targetId ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java index 3057111..d5cdd66 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java @@ -30,13 +30,12 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType; import org.apache.usergrid.persistence.model.entity.Id; import com.google.common.base.Optional; +import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import rx.Observable; -import rx.functions.Action1; -import rx.functions.Func1; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType; /** @@ -52,6 +51,7 @@ public class ReadGraphConnectionByTypeFilter extends AbstractFilter<Id, Edge> im /** * Create a new instance of our command */ + @Inject public ReadGraphConnectionByTypeFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName, @Assisted final String entityType ) { this.graphManagerFactory = graphManagerFactory; @@ -69,7 +69,7 @@ public class ReadGraphConnectionByTypeFilter extends AbstractFilter<Id, Edge> im //set our our constant state final Optional<Edge> startFromCursor = getCursor(); - final String edgeName = getConnectionScopeName( connectionName ); + final String edgeName = getEdgeTypeFromConnectionType( connectionName ); //return all ids that are emitted from this edge http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java index 49360f6..0d4971b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java @@ -25,7 +25,7 @@ import org.apache.usergrid.persistence.graph.GraphManagerFactory; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType; /** @@ -48,6 +48,6 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter { @Override protected String getEdgeTypeName() { - return getConnectionScopeName( connectionName ); + return getEdgeTypeFromConnectionType( connectionName ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java index 382fb64..f5a2893 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java @@ -40,7 +40,8 @@ public class ObservableQueryExecutor implements QueryExecutor { public ObservableQueryExecutor( final Observable<Results> resultsObservable ) { - this.resultsObservable = resultsObservable; + //in no values, we must emit an empty results so add the default + this.resultsObservable = resultsObservable.defaultIfEmpty( new Results() ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java index 5472071..c65db09 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java @@ -236,10 +236,10 @@ public class CpNamingUtils { if ( isCollectionEdgeType( edgeName ) ) { - return getCollectionScopeNameFromCollectionName( getCollectionName( edgeName ) ); + return getCollectionName( edgeName ) ; } - return getConnectionScopeName( getConnectionType( edgeName ) ); + return getConnectionType( edgeName ) ; } @@ -260,20 +260,4 @@ public class CpNamingUtils { } - /** - * Only use when searching graph. When searching ES use the <see>createCollectionSearchEdge</see> - */ - public static String getCollectionScopeNameFromCollectionName( String name ) { - String csn = EDGE_COLL_SUFFIX + name; - return csn.toLowerCase(); - } - - - /** - * Only use when searching graph. When searching ES use the <see>createConnectionSearchByEdge</see> - */ - public static String getConnectionScopeName( String connectionType ) { - String csn = EDGE_CONN_SUFFIX + connectionType; - return csn.toLowerCase(); - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java index c81fa58..0239f4b 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java @@ -164,7 +164,7 @@ public class EdgeTestUtils { */ public static SearchByIdType createSearchByEdgeAndId( final Id sourceId, final String type, final long maxVersion, final String idType, final Edge last ) { - return new SimpleSearchByIdType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, idType, last ); + return new SimpleSearchByIdType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, idType, Optional.of(last) ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23acdd50/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java ---------------------------------------------------------------------- diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java index 7cb5073..214334b 100644 --- a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java +++ b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java @@ -43,8 +43,10 @@ public class ConcurrentProcessSingleton { public static final int LOCK_PORT = Integer.parseInt( System.getProperty( "test.lock.port", "10101" ) ); - public static final boolean CLEAN_STORAGE = - Boolean.parseBoolean( System.getProperty( "test.clean.storage", "false" ) ); +// public static final boolean CLEAN_STORAGE = +// Boolean.parseBoolean( System.getProperty( "test.clean.storage", "false" ) ); + + public static final boolean CLEAN_STORAGE = false; public static final long ONE_MINUTE = 60000;