Initial setup of command pattern and wiring into legacy code
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6098d9b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6098d9b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6098d9b6 Branch: refs/heads/two-dot-o-dev Commit: 6098d9b60a2de306beb02d34f1bf34a42c67285f Parents: 472ccaf Author: Todd Nine <tn...@apigee.com> Authored: Thu Apr 23 16:20:12 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Thu Apr 23 22:49:05 2015 -0600 ---------------------------------------------------------------------- stack/core/pom.xml | 12 -- .../corepersistence/CpEntityManager.java | 29 +-- .../corepersistence/CpEntityManagerFactory.java | 21 +- .../corepersistence/CpRelationManager.java | 71 +++++-- .../usergrid/corepersistence/CpWalker.java | 4 +- .../corepersistence/command/CommandBuilder.java | 111 +++++++++++ .../command/cursor/CursorSerializer.java | 86 +++++++++ .../command/cursor/RequestCursor.java | 45 +++++ .../command/cursor/ResponseCursor.java | 56 ++++++ .../command/read/AbstractCommand.java | 111 +++++++++++ .../command/read/CollectCommand.java | 36 ++++ .../corepersistence/command/read/Command.java | 56 ++++++ .../command/read/TraverseCommand.java | 30 +++ .../command/read/entity/EntityLoadCommand.java | 191 +++++++++++++++++++ .../read/graph/AbstractReadGraphCommand.java | 110 +++++++++++ .../read/graph/ReadGraphCollectionCommand.java | 49 +++++ .../read/graph/ReadGraphConnectionCommand.java | 49 +++++ .../results/AbstractGraphQueryExecutor.java | 127 ++++++++++++ .../results/CollectionGraphQueryExecutor.java | 60 ++++++ .../CollectionResultsLoaderFactoryImpl.java | 1 - .../results/ConnectionGraphQueryExecutor.java | 58 ++++++ .../rx/ApplicationObservable.java | 42 ++-- .../rx/EdgesFromSourceObservable.java | 4 +- .../rx/EdgesToTargetObservable.java | 4 +- .../usergrid/persistence/EntityManager.java | 7 +- .../cassandra/EntityManagerImpl.java | 45 +++-- .../org/apache/usergrid/CoreApplication.java | 3 + .../persistence/collection/EntitySet.java | 14 +- .../serialization/impl/EntitySetImpl.java | 10 +- .../persistence/core/rx/OrderedMerge.java | 28 +-- .../apache/usergrid/persistence/graph/Edge.java | 4 + .../persistence/graph/SearchByEdgeType.java | 2 +- .../graph/impl/SimpleSearchByEdgeType.java | 26 ++- .../impl/stage/NodeDeleteListenerImpl.java | 5 +- .../impl/shard/DirectedEdgeMeta.java | 7 +- .../persistence/graph/GraphManagerLoadTest.java | 6 +- .../graph/GraphManagerShardConsistencyIT.java | 3 +- .../graph/GraphManagerStressTest.java | 8 +- .../usergrid/persistence/model/entity/Id.java | 3 + stack/corepersistence/pom.xml | 4 +- .../usergrid/persistence/index/query/Query.java | 8 + .../scenarios/ConnectionScenarios.scala | 2 +- stack/pom.xml | 1 - 43 files changed, 1419 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/pom.xml ---------------------------------------------------------------------- diff --git a/stack/core/pom.xml b/stack/core/pom.xml index d91208d..f12c0b8 100644 --- a/stack/core/pom.xml +++ b/stack/core/pom.xml @@ -473,18 +473,6 @@ </dependency> <dependency> - <groupId>com.netflix.rxjava</groupId> - <artifactId>rxjava-core</artifactId> - <version>${rx.version}</version> - </dependency> - - <dependency> - <groupId>com.netflix.rxjava</groupId> - <artifactId>rxjava-math</artifactId> - <version>${rx.version}</version> - </dependency> - - <dependency> <groupId>com.clearspring.analytics</groupId> <artifactId>stream</artifactId> <version>2.7.0</version> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 789e640..038be44 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -34,6 +34,8 @@ import java.util.TreeSet; import java.util.UUID; import com.codahale.metrics.Meter; + +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.FieldSet; import org.apache.usergrid.persistence.core.future.BetterFuture; import org.slf4j.Logger; @@ -86,6 +88,7 @@ import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.IndexScope; @@ -178,8 +181,6 @@ public class CpEntityManager implements EntityManager { private UUID applicationId; private Application application; - private CpEntityManagerFactory emf; - private ManagerCache managerCache; private ApplicationScope applicationScope; @@ -188,6 +189,10 @@ public class CpEntityManager implements EntityManager { private CounterUtils counterUtils; + + private EntityCollectionManagerFactory entityCollectionManagerFactory; + private GraphManagerFactory graphManagerFactory; + private boolean skipAggregateCounters; private MetricsFactory metricsFactory; private Timer aggCounterTimer; @@ -220,22 +225,24 @@ public class CpEntityManager implements EntityManager { } @Override - public void init( EntityManagerFactory emf, UUID applicationId ) { + public void init( final CassandraService cassandraService, final CounterUtils counterUtils, final MetricsFactory metricsFactory, final GraphManagerFactory graphManagerFactory, final EntityCollectionManagerFactory entityCollectionManagerFactory, final ManagerCache managerCache, UUID applicationId ) { - Preconditions.checkNotNull( emf, "emf must not be null" ); Preconditions.checkNotNull( applicationId, "applicationId must not be null" ); - this.emf = ( CpEntityManagerFactory ) emf; - this.managerCache = this.emf.getManagerCache(); + this.managerCache = managerCache; this.applicationId = applicationId; applicationScope = CpNamingUtils.getApplicationScope( applicationId ); - this.cass = this.emf.getCassandraService(); - this.counterUtils = this.emf.getCounterUtils(); + this.cass = cassandraService; + this.counterUtils = counterUtils; //Timer Setup - this.metricsFactory = this.emf.getMetricsFactory(); + this.metricsFactory = metricsFactory; + + this.graphManagerFactory = graphManagerFactory; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.aggCounterTimer =this.metricsFactory.getTimer( CpEntityManager.class, "cp.entity.get.aggregate.counters.timer" ); this.entCreateTimer =this.metricsFactory.getTimer( CpEntityManager.class, "cp.entity.create.timer" ); @@ -766,7 +773,7 @@ public class CpEntityManager implements EntityManager { public RelationManager getRelationManager( EntityRef entityRef ) { Preconditions.checkNotNull( entityRef, "entityRef cannot be null" ); CpRelationManager rmi = new CpRelationManager(); - rmi.init( this, emf, applicationId, entityRef, null, metricsFactory ); + rmi.init( this, applicationId, entityRef, null, metricsFactory, managerCache, entityCollectionManagerFactory, graphManagerFactory ); return rmi; } @@ -2885,7 +2892,7 @@ public class CpEntityManager implements EntityManager { public void refreshIndex() { // refresh factory indexes - emf.refreshIndex(); + // refresh this Entity Manager's application's index EntityIndex ei = managerCache.getEntityIndex( getApplicationScope() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index fe4d828..5b8e715 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -44,6 +44,7 @@ import org.apache.usergrid.persistence.cassandra.CounterUtils; import org.apache.usergrid.persistence.cassandra.Setup; import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager; @@ -57,6 +58,7 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.exceptions.OrganizationAlreadyExistsException; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.index.EntityIndex; @@ -99,12 +101,14 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private final OrgApplicationCache orgApplicationCache; - private ManagerCache managerCache; - private DataMigrationManager dataMigrationManager; + private final ManagerCache managerCache; + private final DataMigrationManager dataMigrationManager; + private final GraphManagerFactory graphManagerFactory; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private CassandraService cassandraService; - private CounterUtils counterUtils; - private Injector injector; + private final CassandraService cassandraService; + private final CounterUtils counterUtils; + private final Injector injector; private final MetricsFactory metricsFactory; public CpEntityManagerFactory( @@ -115,8 +119,11 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.injector = injector; this.managerCache = injector.getInstance( ManagerCache.class ); this.dataMigrationManager = injector.getInstance( DataMigrationManager.class ); + this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class ); + this.entityCollectionManagerFactory = injector.getInstance( EntityCollectionManagerFactory.class ); this.metricsFactory = injector.getInstance( MetricsFactory.class ); + this.orgApplicationCache = new OrgApplicationCacheImpl( this ); } @@ -172,7 +179,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private EntityManager _getEntityManager( UUID applicationId ) { EntityManager em = new CpEntityManager(); - em.init( this, applicationId ); + em.init( cassandraService, counterUtils, metricsFactory, graphManagerFactory, entityCollectionManagerFactory, managerCache, applicationId ); return em; } @@ -427,7 +434,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( fromEntityId, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null )); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() )); Iterator<Edge> iter = edges.toBlockingObservable().getIterator(); while ( iter.hasNext() ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index da39ea9..edd352e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -33,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; +import org.apache.usergrid.corepersistence.results.CollectionGraphQueryExecutor; +import org.apache.usergrid.corepersistence.results.ConnectionGraphQueryExecutor; import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor; import org.apache.usergrid.corepersistence.results.QueryExecutor; @@ -61,6 +63,8 @@ import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner; import org.apache.usergrid.persistence.cassandra.index.IndexScanner; import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner; import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.future.BetterFuture; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -71,6 +75,7 @@ import org.apache.usergrid.persistence.geo.EntityLocationRef; import org.apache.usergrid.persistence.geo.model.Point; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; @@ -106,6 +111,7 @@ import org.apache.usergrid.utils.MapUtils; import org.apache.usergrid.utils.UUIDUtils; import com.codahale.metrics.Timer; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import me.prettyprint.hector.api.Keyspace; @@ -167,9 +173,9 @@ public class CpRelationManager implements RelationManager { private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class ); - private CpEntityManagerFactory emf; - private ManagerCache managerCache; + private EntityCollectionManagerFactory entityCollectionManagerFactory; + private GraphManagerFactory graphManagerFactory; private EntityManager em; @@ -196,15 +202,16 @@ public class CpRelationManager implements RelationManager { public CpRelationManager init( - EntityManager em, - CpEntityManagerFactory emf, + EntityManager em, UUID applicationId, EntityRef headEntity, IndexBucketLocator indexBucketLocator, - MetricsFactory metricsFactory) { + MetricsFactory metricsFactory, + ManagerCache managerCache, + EntityCollectionManagerFactory entityCollectionManagerFactory, + GraphManagerFactory graphManagerFactory ) { Assert.notNull( em, "Entity manager cannot be null" ); - Assert.notNull( emf, "Entity manager factory cannot be null" ); Assert.notNull( applicationId, "Application Id cannot be null" ); Assert.notNull( headEntity, "Head entity cannot be null" ); Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" ); @@ -213,10 +220,9 @@ public class CpRelationManager implements RelationManager { //Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be null" ); this.em = em; - this.emf = emf; this.applicationId = applicationId; this.headEntity = headEntity; - this.managerCache = emf.getManagerCache(); + this.managerCache = managerCache; this.applicationScope = CpNamingUtils.getApplicationScope( applicationId ); this.cass = em.getCass(); // TODO: eliminate need for this via Core Persistence @@ -232,6 +238,8 @@ public class CpRelationManager implements RelationManager { // load the Core Persistence version of the head entity as well this.headEntityScope = getCollectionScopeNameFromEntityType( applicationScope.getApplication(), headEntity.getType()); + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.graphManagerFactory = graphManagerFactory; if ( logger.isDebugEnabled() ) { logger.debug( "Loading head entity {}:{} from scope\n app {}\n owner {}\n name {}", @@ -331,7 +339,7 @@ public class CpRelationManager implements RelationManager { public Observable<Edge> call( final String edgeType ) { return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ); @@ -400,7 +408,7 @@ public class CpRelationManager implements RelationManager { public Observable<Edge> call( final String etype ) { return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ) @@ -516,7 +524,7 @@ public class CpRelationManager implements RelationManager { CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ), System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, - null ) ); // last + Optional.<Edge>absent() ) ); // last Iterator<Edge> iterator = edgesToTarget.toBlockingObservable().getIterator(); int count = 0; @@ -541,7 +549,7 @@ public class CpRelationManager implements RelationManager { CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ), System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, - null ) ); // last + Optional.<Edge>absent() ) ); // last int count = edgesFromSource.take( 2 ).count().toBlocking().last(); @@ -927,6 +935,21 @@ public class CpRelationManager implements RelationManager { + "' of " + headEntity.getType() + ":" + headEntity .getUuid() ); } + query.setEntityType( collection.getType() ); + query = adjustQuery( query ); + + /** + * It's a graph query, execute the graph query executor. + * + * TODO refactor all of this away into commands + */ + if(query.isGraphQuery()){ + final QueryExecutor executor = new CollectionGraphQueryExecutor( this.entityCollectionManagerFactory,graphManagerFactory, applicationScope, headEntity, query.getCursor(), collName, query.getLimit() ); + return executor.next(); + } + + + final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); @@ -939,9 +962,6 @@ public class CpRelationManager implements RelationManager { indexScope.getOwner().toString(), indexScope.getName() ); - query.setEntityType( collection.getType() ); - query = adjustQuery( query ); - final CollectionResultsLoaderFactoryImpl resultsLoaderFactory = new CollectionResultsLoaderFactoryImpl( managerCache ); @@ -1362,6 +1382,21 @@ public class CpRelationManager implements RelationManager { headEntity = em.validate( headEntity ); + + query = adjustQuery( query ); + + + /** + * It's a graph query, execute the graph query executor. + * + * TODO refactor all of this away into commands + */ + if(query.isGraphQuery()){ + final QueryExecutor executor = new ConnectionGraphQueryExecutor( this.entityCollectionManagerFactory,graphManagerFactory, applicationScope, headEntity, query.getCursor(), connection, query.getLimit() ); + return executor.next(); + } + + final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), CpNamingUtils.getConnectionScopeName( connection ) ); @@ -1373,7 +1408,6 @@ public class CpRelationManager implements RelationManager { indexScope.getOwner().toString(), indexScope.getName(), searchTypes } ); - query = adjustQuery( query ); final ConnectionResultsLoaderFactoryImpl resultsLoaderFactory = new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, connection ); @@ -1381,9 +1415,6 @@ public class CpRelationManager implements RelationManager { final QueryExecutor executor = new ElasticSearchQueryExecutor(resultsLoaderFactory, ei, applicationScope, indexScope, searchTypes, query); return executor.next(); -// CandidateResults crs = ei.search( indexScope, searchTypes, query ); - -// return buildConnectionResults( indexScope, query, crs, connection ); } @@ -1455,7 +1486,7 @@ public class CpRelationManager implements RelationManager { private CpRelationManager getRelationManager( EntityRef headEntity ) { CpRelationManager rmi = new CpRelationManager(); - rmi.init( em, emf, applicationId, headEntity, null, metricsFactory); + rmi.init( em, applicationId, headEntity, null, metricsFactory, managerCache, entityCollectionManagerFactory, graphManagerFactory); return rmi; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java index 4b902d8..b2354a6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java @@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import com.google.common.base.Optional; + import rx.Observable; import rx.functions.Action1; import rx.functions.Func1; @@ -111,7 +113,7 @@ public class CpWalker { logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId ); return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( - applicationId, edgeType, Long.MAX_VALUE, order , null ) ); + applicationId, edgeType, Long.MAX_VALUE, order , Optional.<Edge>absent() ) ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java new file mode 100644 index 0000000..de76f50 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/CommandBuilder.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command; + + +import org.apache.usergrid.corepersistence.command.cursor.RequestCursor; +import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor; +import org.apache.usergrid.corepersistence.command.read.CollectCommand; +import org.apache.usergrid.corepersistence.command.read.Command; +import org.apache.usergrid.corepersistence.command.read.TraverseCommand; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * A builder that will allow us to build a traversal command for execution + */ +public class CommandBuilder { + + + private final ApplicationScope applicationScope; + private final RequestCursor requestCursor; + private final ResponseCursor responseCursor; + private final int limit; + + private int count = 0; + + private Observable<Id> currentObservable; + + + /** + * Our first pass, where we implement our start point as an Id until we can use this to perform our entire + * traversal. Eventually as we untangle the existing Query service nightmare, the sourceId will be remove and should + * only be traversed from the root application + */ + public CommandBuilder( final ApplicationScope applicationScope, final Id sourceId, + final Optional<String> requestCursor, final int limit ) { + + this.applicationScope = applicationScope; + this.limit = limit; + + //set the request cursor + this.requestCursor = new RequestCursor( requestCursor ); + + //set the response cursor + this.responseCursor = new ResponseCursor(); + + + this.currentObservable = Observable.just( sourceId ); + } + + + /** + * Add a read command that will read Ids and produce Ids. This is an intermediate traversal operations + */ + public CommandBuilder withTraverseCommand( final TraverseCommand traverseCommand ) { + + setState( traverseCommand ); + + this.currentObservable = currentObservable.compose( traverseCommand ); + + return this; + } + + + /** + * Build the final collection step, and + */ + public <T> Observable<T> build( final CollectCommand<T> collectCommand ) { + setState( collectCommand ); + + collectCommand.setLimit( limit ); + + return currentObservable.compose( collectCommand ); + } + + + /** + * Set the id of the state + * @param command + */ + private void setState( Command<?> command ) { + command.setId( count ); + //done for clarity + count++; + + command.setCursorCaches( requestCursor, responseCursor ); + command.setApplicationScope( applicationScope ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java new file mode 100644 index 0000000..b45e7da --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/CursorSerializer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.cursor; + + +import java.io.Serializable; + +import com.fasterxml.jackson.core.Base64Variant; +import com.fasterxml.jackson.core.Base64Variants; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; + + +/** + * A utility to serialize objects to/from cursors + */ +public class CursorSerializer { + + + private static final SmileFactory SMILE_FACTORY = new SmileFactory(); + + private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY ); + + private static final Base64Variant VARIANT = Base64Variants.MODIFIED_FOR_URL; + + + /** + * Serialize the serializable object as a cursor + */ + public static String asCursor( final Serializable cursor ) { + + try { + return MAPPER.writer( VARIANT ).writeValueAsString( cursor ); + } + catch ( JsonProcessingException e ) { + throw new CursorParseException( "Unable to serialize cursor", e ); + } + } + + + /** + * Deserialize from the cursor + * @param cursor + * @return + * @throws CursorParseException + */ + public <T extends Serializable> T fromCursor( final String cursor, final Class<T> cursorClass ) throws CursorParseException { + try { + + final JsonParser parser = MAPPER.getFactory().createParser( cursor ); + return MAPPER.reader( VARIANT ).readValue( parser, cursorClass); + } + catch ( Exception e ) { + throw new CursorParseException( "Unable to serialize cursor", e ); + } + } + + + /** + * Thrown when we can't parse a cursor + */ + public static class CursorParseException extends RuntimeException { + public CursorParseException( final String message, final Throwable cause ) { + super( message, cause ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java new file mode 100644 index 0000000..60d54ff --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/RequestCursor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.cursor; + + +import java.io.Serializable; + +import com.google.common.base.Optional; + + +/** + * A cursor that has been passed in with our request. Adds utils for parsing values + */ +public class RequestCursor { + + public RequestCursor(final Optional<String> cursor){ + + } + + + /** + * Get the cursor with the specified id + */ + public <T extends Serializable> T getCursor( final int id, final Class<T> cursorType ) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java new file mode 100644 index 0000000..02aae34 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/cursor/ResponseCursor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.cursor; + + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + + +/** + * A cursor used in rendering a response + */ +public class ResponseCursor { + + + + /** + * We use a map b/c some indexes might be skipped + */ + private Map<Integer, ? super Serializable> cursors = new HashMap<>(); + + /** + * Set the possible cursor value into the index. DOES NOT parse the cursor. This is intentional for performance + */ + public <T extends Serializable> void setCursor( final int id, final T cursor ) { + cursors.put( id, cursor ); + } + + + private void ensureCapacity() { + + } + + + public String encodeAsString() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java new file mode 100644 index 0000000..59e1848 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/AbstractCommand.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read; + + +import java.io.Serializable; + +import javax.xml.ws.Response; + +import org.apache.usergrid.corepersistence.command.cursor.RequestCursor; +import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import com.google.common.base.Optional; + + +/** + * Basic functionality for our commands to handle cursor IO + */ +public abstract class AbstractCommand<T, C extends Serializable> implements Command<T> { + + private int id; + /** + * The cache of the cursor that was set when the read was started + */ + private RequestCursor readCache; + + /** + * The current state of the write cache. Gets updated as we traverse the observables + */ + private ResponseCursor writeCache; + + + /** + * The applicationScope + */ + protected ApplicationScope applicationScope; + + + @Override + public void setId( final int id ) { + this.id = id; + } + + + @Override + public void setCursorCaches( final RequestCursor readCache, final ResponseCursor writeCache ) { + this.readCache = readCache; + this.writeCache = writeCache; + } + + + @Override + public void setApplicationScope( final ApplicationScope applicationScope ) { + this.applicationScope = applicationScope; + } + + + /** + * Return the parsed value of the cursor from the last request, if it exists + */ + protected Optional<C> getCursor() { + final C cursor = readCache.getCursor( id, getCursorClass() ); + + return Optional.fromNullable( cursor ); + } + + + + + + /** + * Set the cursor value into the new cursor write cache + * @param newValue + */ + protected void setCursor(final C newValue){ + writeCache.setCursor( id, newValue ); + } + + + /** + * Generate our state as a cursor + * @return + */ + protected String generateCursor(){ + return writeCache.encodeAsString(); + } + + /** + * Return the class to be used when parsing the cursor + */ + protected abstract Class<C> getCursorClass(); + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java new file mode 100644 index 0000000..c2ad931 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/CollectCommand.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read; + + +/** + * A command that is used to reduce our stream of results into a final output + * @param <T> + */ +public interface CollectCommand<T> extends Command<T>{ + + /** + * Set the prefered result size for the command + * @param resultSize + */ + void setLimit( final int resultSize ); + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java new file mode 100644 index 0000000..ada47d2 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/Command.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read; + + +import org.apache.usergrid.corepersistence.command.cursor.RequestCursor; +import org.apache.usergrid.corepersistence.command.cursor.ResponseCursor; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; + + +/** + * Interface for a read command. This takes an input of Id, performs some operation, and emits Id for further + * processing + */ +public interface Command<T> extends Observable.Transformer<Id, T> { + + + /** + * Set the id of this command in it's execution environment + */ + void setId( final int id ); + + /** + * Set the cursor cache into the command + * + * @param readCache Set the cache that was used in the request + * @param writeCache Set the cache to be used when writing the results + */ + void setCursorCaches( final RequestCursor readCache, final ResponseCursor writeCache ); + + /** + * Set the application scope of the command + * @param applicationScope + */ + void setApplicationScope(final ApplicationScope applicationScope); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/TraverseCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/TraverseCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/TraverseCommand.java new file mode 100644 index 0000000..c8fe2f8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/TraverseCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read; + + +import org.apache.usergrid.persistence.model.entity.Id; + + +/** + * Traverses edges in the graph. Either by query or graph traversal. Take an observable of ids, and emits + * an observable of ids + */ +public interface TraverseCommand extends Command<Id> {} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java new file mode 100644 index 0000000..f4de9e0 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/entity/EntityLoadCommand.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.entity; + + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.usergrid.corepersistence.command.read.AbstractCommand; +import org.apache.usergrid.corepersistence.command.read.CollectCommand; +import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; +import org.apache.usergrid.persistence.EntityFactory; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; +import rx.functions.Func1; +import rx.observables.GroupedObservable; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType; + + +/** + * Loads entities from a set of Ids. + * + * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification + */ +public class EntityLoadCommand extends AbstractCommand<Results, Serializable> implements CollectCommand<Results> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + //TODO get rid of this when merged into 2.0 dev + private final ApplicationScope applicationScope; + private int resultSize; + + + public EntityLoadCommand( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final ApplicationScope applicationScope ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.applicationScope = applicationScope; + } + + + @Override + protected Class<Serializable> getCursorClass() { + return null; + } + + + @Override + public Observable<Results> call( final Observable<? extends Id> observable ) { + + + /** + * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results + * objects + */ + + + return observable.buffer( resultSize ).flatMap( new Func1<List<? extends Id>, Observable<Results>>() { + @Override + public Observable<Results> call( final List<? extends Id> ids ) { + + return Observable.from( ids ) + //group them by type so we can load them, in 2.0 dev this step will be removed + .groupBy( new Func1<Id, String>() { + @Override + public String call( final Id id ) { + return id.getType(); + } + } ) + + //take all our groups and load them as id sets + + .flatMap( new Func1<GroupedObservable<String, Id>, Observable<EntitySet>>() { + @Override + public Observable<EntitySet> call( + final GroupedObservable<String, Id> stringIdGroupedObservable ) { + + + final String entityType = stringIdGroupedObservable.getKey(); + + final CollectionScope collectionScope = + getCollectionScopeNameFromEntityType( applicationScope.getApplication(), entityType ); + + + return stringIdGroupedObservable.toList() + .flatMap( new Func1<List<Id>, Observable<EntitySet>>() { + @Override + public Observable<EntitySet> call( + final List<Id> ids ) { + + final EntityCollectionManager ecm = + entityCollectionManagerFactory + .createCollectionManager( collectionScope ); + return ecm.load( ids ); + } + } ); + } + } ) + //emit our groups of entities as a stream of entities + .flatMap( new Func1<EntitySet, Observable<org.apache.usergrid.persistence.Entity>>() { + @Override + public Observable<org.apache.usergrid.persistence.Entity> call( final EntitySet entitySet ) { + //emit our entities, and filter out deleted entites + return Observable.from( entitySet.getEntities() ).map( + new Func1<MvccEntity, org.apache.usergrid.persistence.Entity>() { + + @Override + public org.apache.usergrid.persistence.Entity call( final MvccEntity mvccEntity ) { + return mapEntity( mvccEntity ); + } + } ) + //filter null entities + .filter( new Func1<org.apache.usergrid.persistence.Entity, Boolean>() { + @Override + public Boolean call( final org.apache.usergrid.persistence.Entity entity ) { + return entity == null; + } + } ); + } + } ) + + //convert them to a list, then map them into results + .toList().map( new Func1<List<org.apache.usergrid.persistence.Entity>, Results>() { + @Override + public Results call( final List<org.apache.usergrid.persistence.Entity> entities ) { + final Results results = Results.fromEntities( entities ); + results.setCursor( generateCursor() ); + + return results; + } + } ); + } + } ); + } + + + + /** + * Map a new cp entity to an old entity. May be null if not present + */ + private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity mvccEntity ) { + if ( !mvccEntity.getEntity().isPresent() ) { + return null; + } + + + final Entity cpEntity = mvccEntity.getEntity().get(); + final Id entityId = cpEntity.getId(); + + org.apache.usergrid.persistence.Entity entity = + EntityFactory.newEntity( entityId.getUuid(), entityId.getType() ); + + Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); + entity.addProperties( entityMap ); + + return entity; + } + + + @Override + public void setLimit( final int resultSize ) { + this.resultSize = resultSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java new file mode 100644 index 0000000..141fb06 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/AbstractReadGraphCommand.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.graph; + + +import org.apache.usergrid.corepersistence.command.read.AbstractCommand; +import org.apache.usergrid.corepersistence.command.read.TraverseCommand; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; +import rx.functions.Action1; +import rx.functions.Func1; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName; + + +/** + * Command for reading graph edges + */ +public abstract class AbstractReadGraphCommand extends AbstractCommand<Id, Edge> implements TraverseCommand { + + private final GraphManagerFactory graphManagerFactory; + + + /** + * Create a new instance of our command + * @param graphManagerFactory + */ + public AbstractReadGraphCommand( final GraphManagerFactory graphManagerFactory ) { + this.graphManagerFactory = graphManagerFactory; + } + + + @Override + public Observable<Id> call( final Observable<? extends Id> observable ) { + + //get the graph manager + final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); + + //set our our constant state + final Optional<Edge> startFromCursor = getCursor(); + + final String edgeName = getEdgeTypeName(); + + + //return all ids that are emitted from this edge + return observable.flatMap( new Func1<Id, Observable<Id>>() { + + @Override + public Observable<Id> call( final Id id ) { + + final SimpleSearchByEdgeType search = new SimpleSearchByEdgeType(id,edgeName, Long.MAX_VALUE, + SearchByEdgeType.Order.DESCENDING, startFromCursor ); + + /** + * TODO, pass a message with pointers to our cursor values to be generated later + */ + return graphManager.loadEdgesFromSource( search ).doOnNext( new Action1<Edge>() { + @Override + public void call( final Edge edge ) { + setCursor( edge ); + } + } ).map( new Func1<Edge, Id>() { + @Override + public Id call( final Edge edge ) { + return edge.getTargetNode(); + } + } ); + } + } ); + } + + + @Override + protected Class<Edge> getCursorClass() { + return Edge.class; + } + + + + /** + * Get the edge type name we should use when traversing + * @return + */ + protected abstract String getEdgeTypeName(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java new file mode 100644 index 0000000..9ccb969 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphCollectionCommand.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.graph; + + +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromCollectionName; + + +/** + * Command for reading graph edges on a collection + */ +public class ReadGraphCollectionCommand extends AbstractReadGraphCommand { + + private final String collectionName; + + + /** + * Create a new instance of our command + */ + public ReadGraphCollectionCommand( final GraphManagerFactory graphManagerFactory, final String collectionName ) { + super( graphManagerFactory ); + this.collectionName = collectionName; + } + + + @Override + protected String getEdgeTypeName() { + return getCollectionScopeNameFromCollectionName( collectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java new file mode 100644 index 0000000..adebd45 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/command/read/graph/ReadGraphConnectionCommand.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.command.read.graph; + + +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getConnectionScopeName; + + +/** + * Command for reading graph edges on a connection + */ +public class ReadGraphConnectionCommand extends AbstractReadGraphCommand { + + private final String connectionName; + + + /** + * Create a new instance of our command + */ + public ReadGraphConnectionCommand( final GraphManagerFactory graphManagerFactory, final String connectionName ) { + super( graphManagerFactory ); + this.connectionName = connectionName; + } + + + @Override + protected String getEdgeTypeName() { + return getConnectionScopeName( connectionName ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java new file mode 100644 index 0000000..0bea013 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.results; + + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.usergrid.corepersistence.command.CommandBuilder; +import org.apache.usergrid.corepersistence.command.read.entity.EntityLoadCommand; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * This class is a nasty hack to bridge 2.0 observables into 1.0 iterators DO NOT use this as a model for moving + * forward, pandas will die. + */ +public abstract class AbstractGraphQueryExecutor implements QueryExecutor { + + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final ApplicationScope applicationScope; + private final Id sourceId; + private final int limit; + + private final Optional<String> requestCursor; + + private Iterator<Results> observableIterator; + + + public AbstractGraphQueryExecutor( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final ApplicationScope applicationScope, final EntityRef source, + final String cursor, final int limit ) { + + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.applicationScope = applicationScope; + this.limit = limit; + this.sourceId = new SimpleId( source.getUuid(), source.getType() ); + + this.requestCursor = Optional.fromNullable( cursor ); + } + + + @Override + public Iterator<Results> iterator() { + return this; + } + + + @Override + public boolean hasNext() { + + //hasn't been set up yet, run through our first setup + if ( observableIterator == null ) { + //assign them to an interator. this now uses an internal buffer with backpressure, so we won't load all + // results + //set up our command builder + final CommandBuilder commandBuilder = new CommandBuilder( applicationScope, sourceId, requestCursor, limit ); + + + addTraverseCommand( commandBuilder ); + + //construct our results to be observed later. This is a cold observable + final Observable<Results> resultsObservable = + commandBuilder.build( new EntityLoadCommand( entityCollectionManagerFactory, applicationScope ) ); + + this.observableIterator = resultsObservable.toBlocking().getIterator(); + } + + + //see if our current results are not null + return observableIterator.hasNext(); + } + + + @Override + public Results next() { + if ( !hasNext() ) { + throw new NoSuchElementException( "No more results present" ); + } + + final Results results = observableIterator.next(); + + //ugly and tight coupling, but we don't have a choice until we finish some refactoring + results.setQueryExecutor( this ); + + return results; + } + + + @Override + public void remove() { + throw new RuntimeException( "Remove not implemented!!" ); + } + + + /** + * Add the traverse command to the graph + */ + protected abstract void addTraverseCommand( final CommandBuilder commandBuilder ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java new file mode 100644 index 0000000..651c27e --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.results; + + +import org.apache.usergrid.corepersistence.command.CommandBuilder; +import org.apache.usergrid.corepersistence.command.read.graph.ReadGraphCollectionCommand; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import com.google.common.base.Preconditions; + + +/** + * Create The collection graph query executor + */ +public class CollectionGraphQueryExecutor extends AbstractGraphQueryExecutor { + + private final GraphManagerFactory graphManagerFactory; + private final String collectionName; + + + public CollectionGraphQueryExecutor( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final GraphManagerFactory graphManagerFactory, + final ApplicationScope applicationScope, final EntityRef source, + final String cursor, final String connectionName, final int limit ) { + + super( entityCollectionManagerFactory, applicationScope, source, cursor, limit ); + this.graphManagerFactory = graphManagerFactory; + + Preconditions.checkNotNull( connectionName, "connectionName is required on the query" ); + this.collectionName = connectionName; + } + + + @Override + protected void addTraverseCommand( final CommandBuilder commandBuilder ) { + //set the traverse command from the source Id to the connect name + commandBuilder.withTraverseCommand( new ReadGraphCollectionCommand( graphManagerFactory, collectionName ) ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java index b79700b..4b43142 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java @@ -48,7 +48,6 @@ public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory verifier = new CollectionRefsVerifier(); } else if ( resultsLevel == Query.Level.IDS ) { -// verifier = new RefsVerifier(); verifier = new IdsVerifier(); } else { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java new file mode 100644 index 0000000..61726b7 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.results; + + +import org.apache.usergrid.corepersistence.command.CommandBuilder; +import org.apache.usergrid.corepersistence.command.read.graph.ReadGraphConnectionCommand; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; + +import com.google.common.base.Preconditions; + + +public class ConnectionGraphQueryExecutor extends AbstractGraphQueryExecutor { + + private final GraphManagerFactory graphManagerFactory; + private final String connectionName; + + + public ConnectionGraphQueryExecutor( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final GraphManagerFactory graphManagerFactory, + final ApplicationScope applicationScope, final EntityRef source, + final String cursor, final String connectionType, final int limit) { + + super( entityCollectionManagerFactory, applicationScope, source, cursor, limit ); + this.graphManagerFactory = graphManagerFactory; + + Preconditions.checkNotNull(connectionType, "connectionType is required on the query" ); + this.connectionName = connectionType; + } + + + + @Override + protected void addTraverseCommand( final CommandBuilder commandBuilder ) { + //set the traverse command from the source Id to the connect name + commandBuilder.withTraverseCommand( new ReadGraphConnectionCommand( graphManagerFactory, connectionName ) ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java index 6019bca..988cd3b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java @@ -39,6 +39,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + import rx.Observable; import rx.functions.Func1; @@ -89,37 +91,37 @@ public class ApplicationObservable { //we have app infos. For each of these app infos, we have to load the application itself Observable<Id> appIds = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - null ) ).flatMap( new Func1<Edge, Observable<Id>>() { + Optional.<Edge>absent() ) ).flatMap( new Func1<Edge, Observable<Id>>() { @Override public Observable<Id> call( final Edge edge ) { //get the app info and load it final Id appInfo = edge.getTargetNode(); return collectionManager.load( appInfo ) - //filter out null entities - .filter( new Func1<Entity, Boolean>() { - @Override - public Boolean call( final Entity entity ) { - if ( entity == null ) { - logger.warn( "Encountered a null application info for id {}", appInfo ); - return false; - } - - return true; + //filter out null entities + .filter( new Func1<Entity, Boolean>() { + @Override + public Boolean call( final Entity entity ) { + if ( entity == null ) { + logger.warn( "Encountered a null application info for id {}", appInfo ); + return false; } - } ) - //get the id from the entity - .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() { + return true; + } + } ) + //get the id from the entity + .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() { - @Override - public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) { - final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue(); + @Override + public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) { - return CpNamingUtils.generateApplicationId( uuid ); - } - } ); + final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue(); + + return CpNamingUtils.generateApplicationId( uuid ); + } + } ); } } ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java index d3e2ee5..b23886b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java @@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + import rx.Observable; import rx.functions.Func1; @@ -56,7 +58,7 @@ public class EdgesFromSourceObservable { logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode ); return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java index c5dc54d..3130a72 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java @@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; import org.apache.usergrid.persistence.model.entity.Id; +import com.google.common.base.Optional; + import rx.Observable; import rx.functions.Func1; @@ -56,7 +58,7 @@ public class EdgesToTargetObservable { logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode); return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6098d9b6/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java index 65fac8d..645618c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java @@ -17,6 +17,11 @@ package org.apache.usergrid.persistence; +import org.apache.usergrid.corepersistence.ManagerCache; +import org.apache.usergrid.persistence.cassandra.CounterUtils; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.index.query.Query; import java.nio.ByteBuffer; import java.util.Collection; @@ -698,7 +703,7 @@ public interface EntityManager { */ void deleteIndex(); - public void init( EntityManagerFactory emf, UUID applicationId); + public void init( final CassandraService cassandraService, final CounterUtils counterUtils, final MetricsFactory metricsFactory, final GraphManagerFactory graphManagerFactory, final EntityCollectionManagerFactory entityCollectionManagerFactory, final ManagerCache managerCache, UUID applicationId); /** For testing purposes */ public void flushManagerCaches();