Merge branch 'two-dot-o-dev' of 
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-608

# By Todd Nine (5) and Shawn Feldman (2)
# Via Shawn Feldman (2) and Todd Nine (1)
* 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid:
  observable changes
  add is application method
  Updated bi-directional to create 2 directional indexes
  Finishes testing of connections
  Updates pipeline and fixes connectionref querying
  Refactors  operations into easier build pattern.  Pipeline still need some 
work.
  Refactor of pipeline to support type mapping for clarity

Conflicts:
        
stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
        
stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
        
stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/87963740
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/87963740
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/87963740

Branch: refs/heads/USERGRID-669
Commit: 87963740a16c796710c64cb0f67b3195992e9170
Parents: 29a4009 1396ebe
Author: GERey <gre...@apigee.com>
Authored: Tue May 26 16:06:17 2015 -0700
Committer: GERey <gre...@apigee.com>
Committed: Tue May 26 16:06:17 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  24 +-
 .../corepersistence/CpEntityManagerFactory.java |   3 +-
 .../corepersistence/CpRelationManager.java      | 219 ++++++++------
 .../corepersistence/index/IndexServiceImpl.java |  57 +---
 .../corepersistence/pipeline/Pipeline.java      |  93 +++---
 .../pipeline/PipelineBuilderFactory.java        |  39 ---
 .../pipeline/PipelineModule.java                |  20 +-
 .../pipeline/PipelineOperation.java             |   2 +-
 .../pipeline/PipelineOperations.java            |  30 ++
 .../pipeline/builder/CandidateBuilder.java      |  67 +++++
 .../pipeline/builder/ConnectionRefBuilder.java  |  53 ++++
 .../pipeline/builder/EntityBuilder.java         |  54 ++++
 .../pipeline/builder/IdBuilder.java             | 151 ++++++++++
 .../pipeline/builder/PipelineBuilder.java       | 100 +++++++
 .../builder/PipelineBuilderFactory.java         |  35 +++
 .../pipeline/read/AbstractFilter.java           |   2 +-
 .../pipeline/read/AbstractPathFilter.java       |   2 +-
 .../pipeline/read/Collector.java                |  38 ---
 .../pipeline/read/CollectorFactory.java         |  38 ---
 .../corepersistence/pipeline/read/Filter.java   |  31 --
 .../pipeline/read/FilterFactory.java            |  74 +++--
 .../pipeline/read/ReadFilterFactoryImpl.java    | 136 ---------
 .../pipeline/read/ReadPipelineBuilder.java      | 104 -------
 .../pipeline/read/ReadPipelineBuilderImpl.java  | 296 -------------------
 .../pipeline/read/ResultsPage.java              |  10 +-
 .../read/collect/AbstractCollector.java         |  46 ---
 .../read/collect/ConnectionRefFilter.java       |  68 +++++
 .../read/collect/ConnectionRefResumeFilter.java |  86 ++++++
 .../pipeline/read/collect/EntityFilter.java     |  68 -----
 .../read/collect/EntityResumeFilter.java        |  67 +++++
 .../pipeline/read/collect/IdResumeFilter.java   |  61 ++++
 .../read/collect/ResultsPageCollector.java      |  35 ++-
 .../AbstractElasticSearchFilter.java            | 171 -----------
 .../pipeline/read/elasticsearch/Candidate.java  |  55 ----
 .../elasticsearch/CandidateEntityFilter.java    | 234 ---------------
 .../read/elasticsearch/CandidateIdFilter.java   | 201 -------------
 .../ElasticSearchCollectionFilter.java          |  77 -----
 .../ElasticSearchConnectionFilter.java          |  73 -----
 .../ElasticsearchCursorSerializer.java          |  42 ---
 .../read/elasticsearch/Elasticsearchdiagram.jpg | Bin 316655 -> 0 bytes
 .../graph/AbstractReadGraphEdgeByIdFilter.java  |  82 -----
 .../read/graph/AbstractReadGraphFilter.java     | 147 ---------
 .../read/graph/EdgeCursorSerializer.java        |  42 ---
 .../pipeline/read/graph/EntityIdFilter.java     |  54 ----
 .../pipeline/read/graph/EntityLoadFilter.java   | 155 ----------
 .../pipeline/read/graph/GraphDiagram.jpg        | Bin 347711 -> 0 bytes
 .../graph/ReadGraphCollectionByIdFilter.java    |  49 ---
 .../read/graph/ReadGraphCollectionFilter.java   |  53 ----
 .../graph/ReadGraphConnectionByIdFilter.java    |  50 ----
 .../graph/ReadGraphConnectionByTypeFilter.java  | 100 -------
 .../read/graph/ReadGraphConnectionFilter.java   |  53 ----
 .../search/AbstractElasticSearchFilter.java     | 169 +++++++++++
 .../pipeline/read/search/Candidate.java         |  55 ++++
 .../read/search/CandidateEntityFilter.java      | 232 +++++++++++++++
 .../pipeline/read/search/CandidateIdFilter.java | 190 ++++++++++++
 .../search/ElasticsearchCursorSerializer.java   |  40 +++
 .../read/search/Elasticsearchdiagram.jpg        | Bin 0 -> 316655 bytes
 .../read/search/SearchCollectionFilter.java     |  77 +++++
 .../read/search/SearchConnectionFilter.java     |  72 +++++
 .../AbstractReadGraphEdgeByIdFilter.java        |  82 +++++
 .../read/traverse/AbstractReadGraphFilter.java  | 146 +++++++++
 .../read/traverse/EdgeCursorSerializer.java     |  42 +++
 .../pipeline/read/traverse/EntityIdFilter.java  |  53 ++++
 .../read/traverse/EntityLoadVerifyFilter.java   | 154 ++++++++++
 .../pipeline/read/traverse/GraphDiagram.jpg     | Bin 0 -> 347711 bytes
 .../traverse/ReadGraphCollectionByIdFilter.java |  49 +++
 .../traverse/ReadGraphCollectionFilter.java     |  53 ++++
 .../traverse/ReadGraphConnectionByIdFilter.java |  50 ++++
 .../ReadGraphConnectionByTypeFilter.java        |  99 +++++++
 .../traverse/ReadGraphConnectionFilter.java     |  53 ++++
 .../results/ConnectionRefQueryExecutor.java     |  60 ++++
 .../results/EntityQueryExecutor.java            |  84 ++++++
 .../results/ObservableQueryExecutor.java        |  52 ++--
 .../corepersistence/results/QueryExecutor.java  |   1 +
 .../corepersistence/util/CpNamingUtils.java     |   5 +
 .../pipeline/cursor/CursorTest.java             |   4 +-
 76 files changed, 2832 insertions(+), 2737 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/87963740/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --cc 
stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 2166827,e3bbf23..6d0e772
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@@ -167,9 -179,7 +181,9 @@@ public class CpEntityManager implement
  
      private final AsyncEventService indexService;
  
-     private PipelineBuilderFactory pipelineBuilderFactory;
 -    private final PipelineBuilderFactory filterFactory;
++    private final PipelineBuilderFactory pipelineBuilderFactory;
 +
 +    private final GraphManagerFactory graphManagerFactory;
  
      private boolean skipAggregateCounters;
      private MetricsFactory metricsFactory;
@@@ -211,8 -221,7 +225,9 @@@
       */
      public CpEntityManager( final CassandraService cass, final CounterUtils 
counterUtils, final AsyncEventService indexService, final ManagerCache 
managerCache,
                              final MetricsFactory metricsFactory, final 
EntityManagerFig entityManagerFig,
 -                            final PipelineBuilderFactory 
pipelineBuilderFactory,  final UUID applicationId ) {
 +                            final PipelineBuilderFactory 
pipelineBuilderFactory ,
 +                            final GraphManagerFactory 
graphManagerFactory,final UUID applicationId ) {
++
          this.entityManagerFig = entityManagerFig;
  
  
@@@ -221,12 -230,10 +236,13 @@@
          Preconditions.checkNotNull( managerCache, "managerCache must not be 
null" );
          Preconditions.checkNotNull( applicationId, "applicationId must not be 
null" );
          Preconditions.checkNotNull( indexService, "indexService must not be 
null" );
 -        Preconditions.checkNotNull( pipelineBuilderFactory, "filterFactory 
must not be null" );
 -        this.filterFactory = pipelineBuilderFactory;
 +        Preconditions.checkNotNull( pipelineBuilderFactory, 
"pipelineBuilderFactory must not be null" );
 +        Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory 
must not be null" );
 +        this.pipelineBuilderFactory = pipelineBuilderFactory;
 +        this.graphManagerFactory = graphManagerFactory;
 +
  
+ 
          this.managerCache = managerCache;
          this.applicationId = applicationId;
          this.indexService = indexService;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/87963740/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --cc 
stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index b11973e,c9e35f0..5b4af6d
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@@ -201,8 -198,10 +201,9 @@@ public class CpEntityManagerFactory imp
  
  
      private EntityManager _getEntityManager( UUID applicationId ) {
 -        EntityManager em = new CpEntityManager(cassandraService, 
counterUtils, indexService, managerCache, metricsFactory, entityManagerFig,
 +        EntityManager em = new CpEntityManager(cassandraService, 
counterUtils, indexService, managerCache,
 +            metricsFactory, entityManagerFig, pipelineBuilderFactory, 
graphManagerFactory, applicationId );
+ 
 -
 -            pipelineBuilderFactory, applicationId );
          return em;
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/87963740/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --cc 
stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index d7ba7e4,b57ea92..1f7d4de
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@@ -176,123 -159,26 +169,77 @@@ public class IndexServiceImpl implement
      public Observable<IndexOperationMessage> deleteIndexEdge( final 
ApplicationScope applicationScope,
                                                                final Edge edge 
) {
  
 +        final Observable<IndexOperationMessage> batches =
 +            Observable.just( edge ).flatMap( edgeValue -> {
 +                final ApplicationEntityIndex ei = 
entityIndexFactory.createApplicationEntityIndex( applicationScope );
 +                EntityIndexBatch batch = ei.createBatch();
  
 -        //TODO, query ES and remove this edge
  
 -        throw new NotImplementedException( "Implement me" );
 -    }
 +                //review why generating the Scope from the Source  and the 
target node makes sense.
 +                final IndexEdge fromSource = generateScopeFromSource( edge );
 +                final Id targetId = edge.getTargetNode();
 +
 +                CandidateResults targetEdgesToBeDeindexed = 
ei.getAllEdgeDocuments( fromSource, targetId );
 +
 +
 +                //1. Feed the observable the candidate results you got back. 
Since it now does the aggregation for you
 +                // you don't need to worry about putting your code in a do 
while.
  
  
 +                batch = deindexBatchIteratorResolver( fromSource, 
targetEdgesToBeDeindexed, batch );
- 
- 
- 
++                
 +                final IndexEdge fromTarget = generateScopeFromTarget( edge );
 +                final Id sourceId = edge.getSourceNode();
 +
 +                CandidateResults sourceEdgesToBeDeindexed = 
ei.getAllEdgeDocuments( fromTarget, sourceId );
 +
 +                batch = deindexBatchIteratorResolver( fromTarget, 
sourceEdgesToBeDeindexed, batch );
 +
 +                return batch.execute();
 +            } );
 +
 +        return ObservableTimer.time( batches, addTimer );
 +    }
 +
 +    //This should look up the entityId and delete any documents with a 
timestamp that comes before
 +    //The edges that are connected will be compacted away from the graph.
      @Override
      public Observable<IndexOperationMessage> deleteEntityIndexes( final 
ApplicationScope applicationScope,
 -                                                                  final Id 
entityId ) {
 +                                                                  final Id 
entityId, final UUID markedVersion ) {
  
 -        //TODO query ES and remove this entityId
 -        throw new NotImplementedException( "Implement me" );
 -    }
 +        //bootstrap the lower modules from their caches
 +        final ApplicationEntityIndex ei = 
entityIndexFactory.createApplicationEntityIndex( applicationScope );
  
 +        CandidateResults crs = ei.getAllEntityVersionsBeforeMarkedVersion( 
entityId, markedVersion );
  
 +        //not actually sure about the timestamp but ah well. works.
 +        SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( 
applicationScope.getApplication(),
 +            CpNamingUtils.getEdgeTypeFromCollectionName( 
InflectionUtils.pluralize( entityId.getType() ) ), entityId,
 +            entityId.getUuid().timestamp() ) );
  
  
 +        final Observable<IndexOperationMessage>  batches = Observable.from( 
crs )
 +                //collect results into a single batch
 +                .collect( () -> ei.createBatch(), ( batch, candidateResult ) 
-> {
 +                    logger.debug( "Deindexing on edge {} for entity {} added 
to batch",searchEdge , entityId );
 +                    batch.deindex( searchEdge, candidateResult );
 +                } )
 +                    //return the future from the batch execution
 +                .flatMap( batch -> batch.execute() );
  
 +        return ObservableTimer.time(batches, indexTimer);
 +    }
  
- 
-     /**
-      * Get index edges to the target.  Used in only certain entity types, 
such as roles, users, groups etc
-      * where we doubly index on both directions of the edge
-      *
-      * @param graphManager The graph manager
-      * @param entityId The entity's id
-      */
-     private Observable<IndexEdge> getIndexEdgesAsTarget( final GraphManager 
graphManager, final Id entityId ) {
- 
-             final String collectionName = InflectionUtils.pluralize( 
entityId.getType() );
- 
- 
-         final CollectionInfo collection = getDefaultSchema().getCollection( 
Application.ENTITY_TYPE, collectionName );
- 
-         //nothing to do
-         if ( collection == null ) {
-             return Observable.empty();
-         }
- 
- 
-         final String linkedCollection = collection.getLinkedCollection();
- 
-         /**
-          * Nothing to link
-          */
-         if ( linkedCollection == null ) {
-             return Observable.empty();
-         }
- 
- 
-         /**
-          * An observable of sizes as we execute batches
-          *
-          * we're indexing from target->source here
-          */
-         return edgesObservable.getEdgesFromSource( graphManager, entityId, 
linkedCollection )
-                               .map( edge -> generateScopeFromTarget( edge ) );
-     }
- 
 +    /**
 +     * Takes in candidate results and uses the iterator to create batch 
commands
 +     */
  
 +    public EntityIndexBatch deindexBatchIteratorResolver(IndexEdge 
edge,CandidateResults edgesToBeDeindexed, EntityIndexBatch batch){
 +        Iterator itr = edgesToBeDeindexed.iterator();
 +        while( itr.hasNext() ) {
 +            batch.deindex( edge, ( CandidateResult ) itr.next());
 +        }
 +        return batch;
 +    }
  
- 
- 
- 
- 
  }

Reply via email to