Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-608
# By Todd Nine (5) and Shawn Feldman (2) # Via Shawn Feldman (2) and Todd Nine (1) * 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid: observable changes add is application method Updated bi-directional to create 2 directional indexes Finishes testing of connections Updates pipeline and fixes connectionref querying Refactors operations into easier build pattern. Pipeline still need some work. Refactor of pipeline to support type mapping for clarity Conflicts: stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/87963740 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/87963740 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/87963740 Branch: refs/heads/USERGRID-669 Commit: 87963740a16c796710c64cb0f67b3195992e9170 Parents: 29a4009 1396ebe Author: GERey <gre...@apigee.com> Authored: Tue May 26 16:06:17 2015 -0700 Committer: GERey <gre...@apigee.com> Committed: Tue May 26 16:06:17 2015 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 24 +- .../corepersistence/CpEntityManagerFactory.java | 3 +- .../corepersistence/CpRelationManager.java | 219 ++++++++------ .../corepersistence/index/IndexServiceImpl.java | 57 +--- .../corepersistence/pipeline/Pipeline.java | 93 +++--- .../pipeline/PipelineBuilderFactory.java | 39 --- .../pipeline/PipelineModule.java | 20 +- .../pipeline/PipelineOperation.java | 2 +- .../pipeline/PipelineOperations.java | 30 ++ .../pipeline/builder/CandidateBuilder.java | 67 +++++ .../pipeline/builder/ConnectionRefBuilder.java | 53 ++++ .../pipeline/builder/EntityBuilder.java | 54 ++++ .../pipeline/builder/IdBuilder.java | 151 ++++++++++ .../pipeline/builder/PipelineBuilder.java | 100 +++++++ .../builder/PipelineBuilderFactory.java | 35 +++ .../pipeline/read/AbstractFilter.java | 2 +- .../pipeline/read/AbstractPathFilter.java | 2 +- .../pipeline/read/Collector.java | 38 --- .../pipeline/read/CollectorFactory.java | 38 --- .../corepersistence/pipeline/read/Filter.java | 31 -- .../pipeline/read/FilterFactory.java | 74 +++-- .../pipeline/read/ReadFilterFactoryImpl.java | 136 --------- .../pipeline/read/ReadPipelineBuilder.java | 104 ------- .../pipeline/read/ReadPipelineBuilderImpl.java | 296 ------------------- .../pipeline/read/ResultsPage.java | 10 +- .../read/collect/AbstractCollector.java | 46 --- .../read/collect/ConnectionRefFilter.java | 68 +++++ .../read/collect/ConnectionRefResumeFilter.java | 86 ++++++ .../pipeline/read/collect/EntityFilter.java | 68 ----- .../read/collect/EntityResumeFilter.java | 67 +++++ .../pipeline/read/collect/IdResumeFilter.java | 61 ++++ .../read/collect/ResultsPageCollector.java | 35 ++- .../AbstractElasticSearchFilter.java | 171 ----------- .../pipeline/read/elasticsearch/Candidate.java | 55 ---- .../elasticsearch/CandidateEntityFilter.java | 234 --------------- .../read/elasticsearch/CandidateIdFilter.java | 201 ------------- .../ElasticSearchCollectionFilter.java | 77 ----- .../ElasticSearchConnectionFilter.java | 73 ----- .../ElasticsearchCursorSerializer.java | 42 --- .../read/elasticsearch/Elasticsearchdiagram.jpg | Bin 316655 -> 0 bytes .../graph/AbstractReadGraphEdgeByIdFilter.java | 82 ----- .../read/graph/AbstractReadGraphFilter.java | 147 --------- .../read/graph/EdgeCursorSerializer.java | 42 --- .../pipeline/read/graph/EntityIdFilter.java | 54 ---- .../pipeline/read/graph/EntityLoadFilter.java | 155 ---------- .../pipeline/read/graph/GraphDiagram.jpg | Bin 347711 -> 0 bytes .../graph/ReadGraphCollectionByIdFilter.java | 49 --- .../read/graph/ReadGraphCollectionFilter.java | 53 ---- .../graph/ReadGraphConnectionByIdFilter.java | 50 ---- .../graph/ReadGraphConnectionByTypeFilter.java | 100 ------- .../read/graph/ReadGraphConnectionFilter.java | 53 ---- .../search/AbstractElasticSearchFilter.java | 169 +++++++++++ .../pipeline/read/search/Candidate.java | 55 ++++ .../read/search/CandidateEntityFilter.java | 232 +++++++++++++++ .../pipeline/read/search/CandidateIdFilter.java | 190 ++++++++++++ .../search/ElasticsearchCursorSerializer.java | 40 +++ .../read/search/Elasticsearchdiagram.jpg | Bin 0 -> 316655 bytes .../read/search/SearchCollectionFilter.java | 77 +++++ .../read/search/SearchConnectionFilter.java | 72 +++++ .../AbstractReadGraphEdgeByIdFilter.java | 82 +++++ .../read/traverse/AbstractReadGraphFilter.java | 146 +++++++++ .../read/traverse/EdgeCursorSerializer.java | 42 +++ .../pipeline/read/traverse/EntityIdFilter.java | 53 ++++ .../read/traverse/EntityLoadVerifyFilter.java | 154 ++++++++++ .../pipeline/read/traverse/GraphDiagram.jpg | Bin 0 -> 347711 bytes .../traverse/ReadGraphCollectionByIdFilter.java | 49 +++ .../traverse/ReadGraphCollectionFilter.java | 53 ++++ .../traverse/ReadGraphConnectionByIdFilter.java | 50 ++++ .../ReadGraphConnectionByTypeFilter.java | 99 +++++++ .../traverse/ReadGraphConnectionFilter.java | 53 ++++ .../results/ConnectionRefQueryExecutor.java | 60 ++++ .../results/EntityQueryExecutor.java | 84 ++++++ .../results/ObservableQueryExecutor.java | 52 ++-- .../corepersistence/results/QueryExecutor.java | 1 + .../corepersistence/util/CpNamingUtils.java | 5 + .../pipeline/cursor/CursorTest.java | 4 +- 76 files changed, 2832 insertions(+), 2737 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/87963740/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 2166827,e3bbf23..6d0e772 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@@ -167,9 -179,7 +181,9 @@@ public class CpEntityManager implement private final AsyncEventService indexService; - private PipelineBuilderFactory pipelineBuilderFactory; - private final PipelineBuilderFactory filterFactory; ++ private final PipelineBuilderFactory pipelineBuilderFactory; + + private final GraphManagerFactory graphManagerFactory; private boolean skipAggregateCounters; private MetricsFactory metricsFactory; @@@ -211,8 -221,7 +225,9 @@@ */ public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, final EntityManagerFig entityManagerFig, - final PipelineBuilderFactory pipelineBuilderFactory, final UUID applicationId ) { + final PipelineBuilderFactory pipelineBuilderFactory , + final GraphManagerFactory graphManagerFactory,final UUID applicationId ) { ++ this.entityManagerFig = entityManagerFig; @@@ -221,12 -230,10 +236,13 @@@ Preconditions.checkNotNull( managerCache, "managerCache must not be null" ); Preconditions.checkNotNull( applicationId, "applicationId must not be null" ); Preconditions.checkNotNull( indexService, "indexService must not be null" ); - Preconditions.checkNotNull( pipelineBuilderFactory, "filterFactory must not be null" ); - this.filterFactory = pipelineBuilderFactory; + Preconditions.checkNotNull( pipelineBuilderFactory, "pipelineBuilderFactory must not be null" ); + Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory must not be null" ); + this.pipelineBuilderFactory = pipelineBuilderFactory; + this.graphManagerFactory = graphManagerFactory; + + this.managerCache = managerCache; this.applicationId = applicationId; this.indexService = indexService; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/87963740/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index b11973e,c9e35f0..5b4af6d --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@@ -201,8 -198,10 +201,9 @@@ public class CpEntityManagerFactory imp private EntityManager _getEntityManager( UUID applicationId ) { - EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig, + EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, + metricsFactory, entityManagerFig, pipelineBuilderFactory, graphManagerFactory, applicationId ); + - - pipelineBuilderFactory, applicationId ); return em; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/87963740/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index d7ba7e4,b57ea92..1f7d4de --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@@ -176,123 -159,26 +169,77 @@@ public class IndexServiceImpl implement public Observable<IndexOperationMessage> deleteIndexEdge( final ApplicationScope applicationScope, final Edge edge ) { + final Observable<IndexOperationMessage> batches = + Observable.just( edge ).flatMap( edgeValue -> { + final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope ); + EntityIndexBatch batch = ei.createBatch(); - //TODO, query ES and remove this edge - throw new NotImplementedException( "Implement me" ); - } + //review why generating the Scope from the Source and the target node makes sense. + final IndexEdge fromSource = generateScopeFromSource( edge ); + final Id targetId = edge.getTargetNode(); + + CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId ); + + + //1. Feed the observable the candidate results you got back. Since it now does the aggregation for you + // you don't need to worry about putting your code in a do while. + batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed, batch ); - - - ++ + final IndexEdge fromTarget = generateScopeFromTarget( edge ); + final Id sourceId = edge.getSourceNode(); + + CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId ); + + batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch ); + + return batch.execute(); + } ); + + return ObservableTimer.time( batches, addTimer ); + } + + //This should look up the entityId and delete any documents with a timestamp that comes before + //The edges that are connected will be compacted away from the graph. @Override public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope, - final Id entityId ) { + final Id entityId, final UUID markedVersion ) { - //TODO query ES and remove this entityId - throw new NotImplementedException( "Implement me" ); - } + //bootstrap the lower modules from their caches + final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope ); + CandidateResults crs = ei.getAllEntityVersionsBeforeMarkedVersion( entityId, markedVersion ); + //not actually sure about the timestamp but ah well. works. + SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(), + CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId, + entityId.getUuid().timestamp() ) ); + final Observable<IndexOperationMessage> batches = Observable.from( crs ) + //collect results into a single batch + .collect( () -> ei.createBatch(), ( batch, candidateResult ) -> { + logger.debug( "Deindexing on edge {} for entity {} added to batch",searchEdge , entityId ); + batch.deindex( searchEdge, candidateResult ); + } ) + //return the future from the batch execution + .flatMap( batch -> batch.execute() ); + return ObservableTimer.time(batches, indexTimer); + } - - /** - * Get index edges to the target. Used in only certain entity types, such as roles, users, groups etc - * where we doubly index on both directions of the edge - * - * @param graphManager The graph manager - * @param entityId The entity's id - */ - private Observable<IndexEdge> getIndexEdgesAsTarget( final GraphManager graphManager, final Id entityId ) { - - final String collectionName = InflectionUtils.pluralize( entityId.getType() ); - - - final CollectionInfo collection = getDefaultSchema().getCollection( Application.ENTITY_TYPE, collectionName ); - - //nothing to do - if ( collection == null ) { - return Observable.empty(); - } - - - final String linkedCollection = collection.getLinkedCollection(); - - /** - * Nothing to link - */ - if ( linkedCollection == null ) { - return Observable.empty(); - } - - - /** - * An observable of sizes as we execute batches - * - * we're indexing from target->source here - */ - return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection ) - .map( edge -> generateScopeFromTarget( edge ) ); - } - + /** + * Takes in candidate results and uses the iterator to create batch commands + */ + public EntityIndexBatch deindexBatchIteratorResolver(IndexEdge edge,CandidateResults edgesToBeDeindexed, EntityIndexBatch batch){ + Iterator itr = edgesToBeDeindexed.iterator(); + while( itr.hasNext() ) { + batch.deindex( edge, ( CandidateResult ) itr.next()); + } + return batch; + } - - - - }