First pass of changing cp and rm manager to use batches
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bba08ddc Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bba08ddc Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bba08ddc Branch: refs/heads/two-dot-o Commit: bba08ddc1ed27b719ecaf46c63eb3f20d4bef8f5 Parents: fba71c2 Author: Todd Nine <toddn...@apache.org> Authored: Wed Oct 1 22:47:06 2014 -0600 Committer: Todd Nine <toddn...@apache.org> Committed: Wed Oct 1 22:47:06 2014 -0600 ---------------------------------------------------------------------- .../CpEntityIndexDeleteListener.java | 8 +- .../corepersistence/CpEntityManager.java | 40 ++++++--- .../corepersistence/CpEntityManagerFactory.java | 33 ++++--- .../corepersistence/CpManagerCache.java | 12 +-- .../corepersistence/CpRelationManager.java | 93 +++++++++++++------- .../CpEntityIndexDeleteListenerTest.java | 12 ++- .../core/scope/ApplicationScopeImpl.java | 6 +- .../serialization/EdgeSerializationTest.java | 3 + .../index/impl/EsEntityIndexBatchImpl.java | 42 +++++++-- .../index/impl/EsEntityIndexImpl.java | 2 +- .../index/utils/IndexValidationUtils.java | 10 +++ 11 files changed, 177 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java index f8b28ac..cf3f4a6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java @@ -59,13 +59,13 @@ public class CpEntityIndexDeleteListener { public Observable<EntityVersion> receive(final MvccEntityDeleteEvent event) { - CollectionScope collectionScope = event.getCollectionScope(); - IndexScope indexScope = new IndexScopeImpl(collectionScope.getApplication(), collectionScope.getOwner(), collectionScope.getName()); + final CollectionScope collectionScope = event.getCollectionScope(); + final IndexScope indexScope = new IndexScopeImpl(collectionScope.getApplication(), collectionScope.getOwner(), collectionScope.getName()); final EntityIndex entityIndex = entityIndexFactory.createEntityIndex(indexScope); return Observable.create(new ObservableIterator<CandidateResult>("deleteEsIndexVersions") { @Override protected Iterator<CandidateResult> getIterator() { - CandidateResults results = entityIndex.getEntityVersions(event.getEntity().getId()); + CandidateResults results = entityIndex.getEntityVersions(indexScope, event.getEntity().getId()); return results.iterator(); } }).subscribeOn(Schedulers.io()) @@ -78,7 +78,7 @@ public class CpEntityIndexDeleteListener { //filter find entities <= current version if (entity.getVersion().timestamp() <= event.getVersion().timestamp()) { versions.add(entity); - entityIndex.deindex(entity.getId(), entity.getVersion()); + entityIndex.createBatch().deindex(indexScope, entity.getId(), entity.getVersion()); } } return Observable.from(versions); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index f85ba30..a5475e7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -74,6 +74,7 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException; import org.apache.usergrid.persistence.index.EntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.IndexScope; import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.index.query.CounterResolution; @@ -561,6 +562,12 @@ public class CpEntityManager implements EntityManager { logger.debug( "Deleting indexes of all {} collections owning the entity", owners.keySet().size() ); + final EntityIndex ei = managerCache.getEntityIndex( appScope ); + + final EntityIndexBatch batch = ei.createBatch(); + + + for ( String ownerType : owners.keySet() ) { Map<UUID, Set<String>> collectionsByUuid = owners.get( ownerType ); @@ -573,27 +580,30 @@ public class CpEntityManager implements EntityManager { new SimpleId( uuid, ownerType ), CpEntityManager.getCollectionScopeNameFromCollectionName(coll) ); - EntityIndex ei = managerCache.getEntityIndex( indexScope ); - ei.deindex( entity ); + batch.index( indexScope, entity ); } } } + + // deindex from default index scope IndexScope defaultIndexScope = new IndexScopeImpl( appScope.getApplication(), appScope.getApplication(), getCollectionScopeNameFromEntityType( entityRef.getType() ) ); - EntityIndex entityIndex = managerCache.getEntityIndex( defaultIndexScope ); - entityIndex.deindex( entity ); + + batch.deindex(defaultIndexScope, entity ); IndexScope allTypesIndexScope = new IndexScopeImpl( appScope.getApplication(), appScope.getApplication(), ALL_TYPES); - EntityIndex aei = managerCache.getEntityIndex( allTypesIndexScope ); - aei.deindex( entity ); + + batch.deindex( allTypesIndexScope, entity ); + + batch.execute(); decrementEntityCollection( Schema.defaultCollectionName( entityId.getType() ) ); @@ -980,7 +990,7 @@ public class CpEntityManager implements EntityManager { getCollectionScopeNameFromEntityType( entityRef.getType()) ); EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope ); - EntityIndex ei = managerCache.getEntityIndex( defaultIndexScope ); + EntityIndex ei = managerCache.getEntityIndex( appScope ); Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() ); @@ -1002,7 +1012,8 @@ public class CpEntityManager implements EntityManager { logger.debug("Wrote {}:{} version {}", new Object[] { cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() }); - ei.index( cpEntity ); + + ei.createBatch().index(defaultIndexScope, cpEntity ).execute(); // update in all containing collections and connection indexes CpRelationManager rm = (CpRelationManager)getRelationManager( entityRef ); @@ -2530,12 +2541,13 @@ public class CpEntityManager implements EntityManager { } // Index CP entity into default collection scope - IndexScope defaultIndexScope = new IndexScopeImpl( - appScope.getApplication(), - appScope.getApplication(), - CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) ); - EntityIndex ei = managerCache.getEntityIndex( defaultIndexScope ); - ei.index( cpEntity ); +// IndexScope defaultIndexScope = new IndexScopeImpl( +// appScope.getApplication(), +// appScope.getApplication(), +// CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) ); +// EntityIndex ei = managerCache.getEntityIndex( appScope ); +// +// ei.createBatch().index( defaultIndexScope, cpEntity ).execute(); // reflect changes in the legacy Entity entity.setUuid( cpEntity.getId().getUuid() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 3e9a6a9..090a4d6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -50,6 +50,7 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.index.EntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexScope; import org.apache.usergrid.persistence.index.query.CandidateResult; @@ -263,8 +264,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application .getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE ); orgInfoEntity = ecm.write( orgInfoEntity ).toBlockingObservable().last(); - eci.index( orgInfoEntity ); - eci.refresh(); + eci.createBatch().index(SYSTEM_ORGS_INDEX_SCOPE, orgInfoEntity ).executeAndRefresh(); } if ( properties == null ) { @@ -288,7 +288,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application .getEntityIndex( SYSTEM_APPS_INDEX_SCOPE ); appInfoEntity = ecm.write( appInfoEntity ).toBlockingObservable().last(); - eci.index( appInfoEntity ); + eci.createBatch().index(SYSTEM_APPS_INDEX_SCOPE, appInfoEntity ).executeAndRefresh(); eci.refresh(); } @@ -325,7 +325,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application Query q = Query.fromQL(PROPERTY_NAME + " = '" + name + "'"); EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE ); - CandidateResults results = ei.search( q ); + CandidateResults results = ei.search(SYSTEM_ORGS_INDEX_SCOPE, q ); if ( results.isEmpty() ) { return null; @@ -342,7 +342,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_APPS_INDEX_SCOPE ); - CandidateResults results = ei.search( q ); + CandidateResults results = ei.search(SYSTEM_APPS_INDEX_SCOPE, q ); if ( results.isEmpty() ) { return null; @@ -371,7 +371,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application Query q = Query.fromQL("select *"); q.setCursor( cursor ); - CandidateResults results = ei.search( q ); + CandidateResults results = ei.search(SYSTEM_APPS_INDEX_SCOPE, q ); cursor = results.getCursor(); Iterator<CandidateResult> iter = results.iterator(); @@ -417,7 +417,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application Query q = Query.fromQL("select *"); - CandidateResults results = ei.search( q ); + CandidateResults results = ei.search(SYSTEM_PROPS_INDEX_SCOPE, q ); if ( results.isEmpty() ) { return new HashMap<String,String>(); @@ -447,7 +447,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application .getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE ); Query q = Query.fromQL("select *"); - CandidateResults results = ei.search( q ); + CandidateResults results = ei.search(SYSTEM_PROPS_INDEX_SCOPE, q ); Entity propsEntity; if ( !results.isEmpty() ) { propsEntity = em.load( results.iterator().next().getId()).toBlockingObservable().last(); @@ -464,7 +464,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application } propsEntity = em.write( propsEntity ).toBlockingObservable().last(); - ei.index( propsEntity ); + ei.createBatch().index( SYSTEM_PROPS_INDEX_SCOPE, propsEntity ); return true; } @@ -485,7 +485,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE ); Query q = Query.fromQL("select *"); - CandidateResults results = ei.search( q ); + CandidateResults results = ei.search(SYSTEM_PROPS_INDEX_SCOPE, q ); Entity propsEntity = em.load( results.iterator().next().getId() ).toBlockingObservable().last(); @@ -501,7 +501,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application propsEntity.removeField( name ); propsEntity = em.write( propsEntity ).toBlockingObservable().last(); - ei.index( propsEntity ); + ei.createBatch().index( SYSTEM_PROPS_INDEX_SCOPE, propsEntity ); return true; } @@ -616,9 +616,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application EntityIndex ei = managerCache.getEntityIndex( is ); Query q = Query.fromQL("select *"); - CandidateResults results = ei.search( q ); + CandidateResults results = ei.search(is, q ); - Map<String, UUID> appMap = new HashMap<String, UUID>(); + int count = 0; + final EntityIndexBatch batch = ei.createBatch(); Iterator<CandidateResult> iter = results.iterator(); while (iter.hasNext()) { @@ -643,7 +644,11 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application } ); - ei.index(entity); + batch.index(is, entity); + + + + count++; } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java index 2f8bd3f..0e7c084 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java @@ -36,8 +36,8 @@ class CpManagerCache { private final LRUCache2<CollectionScope, EntityCollectionManager> ecmCache = new LRUCache2<CollectionScope, EntityCollectionManager>(50, 1 * 60 * 60 * 1000); - private final LRUCache2<IndexScope, EntityIndex> eiCache - = new LRUCache2<IndexScope, EntityIndex>(50, 1 * 60 * 60 * 1000); + private final LRUCache2<ApplicationScope, EntityIndex> eiCache + = new LRUCache2<>(50, 1 * 60 * 60 * 1000); private final LRUCache2<ApplicationScope, GraphManager> gmCache = new LRUCache2<ApplicationScope, GraphManager>(50, 1 * 60 * 60 * 1000); @@ -61,13 +61,13 @@ class CpManagerCache { return ecm; } - public EntityIndex getEntityIndex(IndexScope indexScope) { + public EntityIndex getEntityIndex(ApplicationScope applicationScope) { - EntityIndex ei = eiCache.get(indexScope); + EntityIndex ei = eiCache.get(applicationScope); if (ei == null) { - ei = eif.createEntityIndex(indexScope); - eiCache.put(indexScope, ei); + ei = eif.createEntityIndex(applicationScope); + eiCache.put(applicationScope, ei); } return ei; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 3486fbc..260e429 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.utils.UUIDUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -425,6 +426,11 @@ public class CpRelationManager implements RelationManager { // loop through all types of edge to target int count = 0; + + final EntityIndex ei = managerCache.getEntityIndex( applicationScope ); + + final EntityIndexBatch entityIndexBatch = ei.createBatch(); + while ( edgeTypesToTarget.hasNext() ) { // get all edges of the type @@ -460,10 +466,9 @@ public class CpRelationManager implements RelationManager { applicationScope.getApplication(), new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()), CpEntityManager.getConnectionScopeName( cpHeadEntity.getId().getType(), connName )); - } - - EntityIndex ei = managerCache.getEntityIndex(indexScope); - ei.index(cpEntity); + } + + entityIndexBatch.index(indexScope, cpEntity); // reindex the entity in the source entity's all-types index @@ -471,8 +476,8 @@ public class CpRelationManager implements RelationManager { applicationScope.getApplication(), new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()), ALL_TYPES); - ei = managerCache.getEntityIndex(indexScope); - ei.index(cpEntity); + + entityIndexBatch.index(indexScope, cpEntity); count++; } @@ -659,6 +664,7 @@ public class CpRelationManager implements RelationManager { applicationScope.getApplication(), applicationScope.getApplication(), CpEntityManager.getCollectionScopeNameFromEntityType( itemRef.getType())); + EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager(memberScope); org.apache.usergrid.persistence.model.entity.Entity memberEntity = memberMgr.load( @@ -700,29 +706,36 @@ public class CpRelationManager implements RelationManager { GraphManager gm = managerCache.getGraphManager(applicationScope); gm.writeEdge(edge).toBlockingObservable().last(); + final EntityIndex index = managerCache.getEntityIndex( applicationScope ); + + final EntityIndexBatch batch = index.createBatch(); + + // index member into entity collection | type scope IndexScope collectionIndexScope = new IndexScopeImpl( applicationScope.getApplication(), cpHeadEntity.getId(), CpEntityManager.getCollectionScopeNameFromCollectionName( collName )); - EntityIndex collectionIndex = managerCache.getEntityIndex(collectionIndexScope); - collectionIndex.index( memberEntity ); + + batch.index(collectionIndexScope, memberEntity ); // index member into entity | all-types scope IndexScope entityAllTypesScope = new IndexScopeImpl( applicationScope.getApplication(), cpHeadEntity.getId(), ALL_TYPES); - EntityIndex entityAllCollectionIndex = managerCache.getEntityIndex(entityAllTypesScope); - entityAllCollectionIndex.index( memberEntity ); + + batch.index(entityAllTypesScope, memberEntity ); // index member into application | all-types scope IndexScope appAllTypesScope = new IndexScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(), ALL_TYPES); - EntityIndex allCollectionIndex = managerCache.getEntityIndex(appAllTypesScope); - allCollectionIndex.index( memberEntity ); + + batch.index( appAllTypesScope, memberEntity ); + + batch.execute(); logger.debug("Added entity {}:{} to collection {}", new String[] { itemRef.getUuid().toString(), itemRef.getType(), collName }); @@ -844,13 +857,16 @@ public class CpRelationManager implements RelationManager { org.apache.usergrid.persistence.model.entity.Entity memberEntity = memberMgr.load( new SimpleId( itemRef.getUuid(), itemRef.getType() )).toBlockingObservable().last(); + final EntityIndex ei = managerCache.getEntityIndex(applicationScope); + final EntityIndexBatch batch = ei.createBatch(); + // remove item from collection index IndexScope indexScope = new IndexScopeImpl( applicationScope.getApplication(), cpHeadEntity.getId(), CpEntityManager.getCollectionScopeNameFromCollectionName( collName )); - EntityIndex ei = managerCache.getEntityIndex(indexScope); - ei.deindex( memberEntity ); + + batch.deindex(indexScope, memberEntity ); // remove collection from item index IndexScope itemScope = new IndexScopeImpl( @@ -858,8 +874,11 @@ public class CpRelationManager implements RelationManager { memberEntity.getId(), CpEntityManager.getCollectionScopeNameFromCollectionName( Schema.defaultCollectionName( cpHeadEntity.getId().getType() ))); - ei = managerCache.getEntityIndex(itemScope); - ei.deindex( cpHeadEntity ); + + + batch.deindex(itemScope, cpHeadEntity ); + + batch.execute(); // remove edge from collection to item GraphManager gm = managerCache.getGraphManager(applicationScope); @@ -958,7 +977,8 @@ public class CpRelationManager implements RelationManager { applicationScope.getApplication(), cpHeadEntity.getId(), CpEntityManager.getCollectionScopeNameFromCollectionName( collName )); - EntityIndex ei = managerCache.getEntityIndex(indexScope); + + EntityIndex ei = managerCache.getEntityIndex(applicationScope); logger.debug("Searching scope {}:{}:{}", new String[] { @@ -969,7 +989,7 @@ public class CpRelationManager implements RelationManager { query.setEntityType( collection.getType() ); query = adjustQuery( query ); - CandidateResults crs = ei.search( query ); + CandidateResults crs = ei.search(indexScope, query ); return buildResults( query, crs, collName ); @@ -1088,21 +1108,25 @@ public class CpRelationManager implements RelationManager { GraphManager gm = managerCache.getGraphManager(applicationScope); gm.writeEdge(edge).toBlockingObservable().last(); + final EntityIndex ei = managerCache.getEntityIndex(applicationScope); + final EntityIndexBatch batch = ei.createBatch(); + // Index the new connection in app|source|type context IndexScope indexScope = new IndexScopeImpl( applicationScope.getApplication(), cpHeadEntity.getId(), CpEntityManager.getConnectionScopeName( connectedEntityRef.getType(), connectionType )); - EntityIndex ei = managerCache.getEntityIndex(indexScope); - ei.index( targetEntity ); + batch.index( indexScope, targetEntity ); // Index the new connection in app|scope|all-types context IndexScope allTypesIndexScope = new IndexScopeImpl( applicationScope.getApplication(), cpHeadEntity.getId(), ALL_TYPES); - EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope); - aei.index( targetEntity ); + + batch.index( allTypesIndexScope, targetEntity ); + + batch.execute(); Keyspace ko = cass.getApplicationKeyspace( applicationId ); Mutator<ByteBuffer> m = createMutator( ko, be ); @@ -1307,21 +1331,23 @@ public class CpRelationManager implements RelationManager { GraphManager gm = managerCache.getGraphManager(applicationScope); gm.deleteEdge(edge).toBlockingObservable().last(); + final EntityIndex ei = managerCache.getEntityIndex( applicationScope ) ; + final EntityIndexBatch batch = ei.createBatch(); + // Deindex the connection in app|source|type context IndexScope indexScope = new IndexScopeImpl( applicationScope.getApplication(), new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ), CpEntityManager.getConnectionScopeName( targetEntity.getId().getType(), connectionType )); - EntityIndex ei = managerCache.getEntityIndex( indexScope ); - ei.deindex( targetEntity ); + batch.deindex( indexScope , targetEntity ); // Deindex the connection in app|source|type context IndexScope allTypesIndexScope = new IndexScopeImpl( applicationScope.getApplication(), new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ), ALL_TYPES); - EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope); - aei.deindex( targetEntity ); + + batch.deindex( allTypesIndexScope, targetEntity ); } @@ -1377,7 +1403,9 @@ public class CpRelationManager implements RelationManager { applicationScope.getApplication(), cpHeadEntity.getId(), scopeName); - EntityIndex ei = managerCache.getEntityIndex(indexScope); + + final EntityIndex ei = managerCache.getEntityIndex(applicationScope); + logger.debug("Searching connected entities from scope {}:{}:{}", new String[] { indexScope.getApplication().toString(), @@ -1385,7 +1413,7 @@ public class CpRelationManager implements RelationManager { indexScope.getName()}); query = adjustQuery( query ); - CandidateResults crs = ei.search( query ); + CandidateResults crs = ei.search( indexScope, query ); raw = buildResults( query , crs, query.getConnectionType() ); } @@ -1480,7 +1508,8 @@ public class CpRelationManager implements RelationManager { applicationScope.getApplication(), cpHeadEntity.getId(), ALL_TYPES); - EntityIndex ei = managerCache.getEntityIndex(indexScope); + + EntityIndex ei = managerCache.getEntityIndex(applicationScope); logger.debug("Searching connections from the all-types scope {}:{}:{}", new String[] { indexScope.getApplication().toString(), @@ -1488,7 +1517,7 @@ public class CpRelationManager implements RelationManager { indexScope.getName()}); query = adjustQuery( query ); - CandidateResults crs = ei.search( query ); + CandidateResults crs = ei.search(indexScope, query ); return buildConnectionResults(query , crs, query.getConnectionType() ); } @@ -1498,7 +1527,7 @@ public class CpRelationManager implements RelationManager { cpHeadEntity.getId(), CpEntityManager.getConnectionScopeName( query.getEntityType(), query.getConnectionType() )); - EntityIndex ei = managerCache.getEntityIndex(indexScope); + EntityIndex ei = managerCache.getEntityIndex(applicationScope); logger.debug("Searching connections from the '{}' scope {}:{}:{}", new String[] { indexScope.getApplication().toString(), @@ -1506,7 +1535,7 @@ public class CpRelationManager implements RelationManager { indexScope.getName()}); query = adjustQuery( query ); - CandidateResults crs = ei.search( query ); + CandidateResults crs = ei.search( indexScope, query ); return buildConnectionResults(query , crs, query.getConnectionType() ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java index 6ee62c6..d59432b 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java @@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImp import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.core.entity.EntityVersion; import org.apache.usergrid.persistence.index.EntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexScope; import org.apache.usergrid.persistence.index.query.CandidateResult; @@ -82,6 +83,10 @@ public class CpEntityIndexDeleteListenerTest { when(scope.getApplication()).thenReturn(entityId); when(eif.createEntityIndex(any(IndexScope.class))).thenReturn(entityIndex); + final EntityIndexBatch batch = mock(EntityIndexBatch.class); + + when(entityIndex.createBatch()).thenReturn( batch ); + CandidateResults results = mock(CandidateResults.class); List<CandidateResult> resultsList = new ArrayList<>(); resultsList.add(entity); @@ -90,7 +95,7 @@ public class CpEntityIndexDeleteListenerTest { when(results.iterator()).thenReturn(entities); when(serializationFig.getBufferSize()).thenReturn(10); when(serializationFig.getHistorySize()).thenReturn(20); - when(entityIndex.getEntityVersions(entityId)).thenReturn(results); + when(entityIndex.getEntityVersions(any(IndexScope.class), entityId)).thenReturn(results); MvccEntity mvccEntity = new MvccEntityImpl(entityId,uuid, MvccEntity.Status.COMPLETE,mock(Entity.class)); @@ -98,6 +103,9 @@ public class CpEntityIndexDeleteListenerTest { Observable<EntityVersion> o = esEntityIndexDeleteListener.receive(event); EntityVersion testEntity = o.toBlocking().last(); assertEquals(testEntity.getId(),mvccEntity.getId()); - verify(entityIndex).deindex(entity.getId(),entity.getVersion()); + + verify(entityIndex).createBatch(); + + verify(batch).deindex(any(IndexScope.class), entity.getId(),entity.getVersion()); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java index 4e067c2..e8dbb02 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java @@ -48,13 +48,13 @@ public class ApplicationScopeImpl implements ApplicationScope { if ( this == o ) { return true; } - if ( !( o instanceof ApplicationScopeImpl ) ) { + if ( !( o instanceof ApplicationScope ) ) { return false; } - final ApplicationScopeImpl that = ( ApplicationScopeImpl ) o; + final ApplicationScope that = ( ApplicationScope ) o; - if ( !application.equals( that.application ) ) { + if ( !application.equals( that.getApplication() ) ) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java index 9c29d56..57391de 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; @@ -656,6 +657,7 @@ public abstract class EdgeSerializationTest { * Test paging by resuming the search from the edge */ @Test + @Ignore("Kills embedded cassandra") public void pageIteration() throws ConnectionException { int size = graphFig.getScanPageSize() * 2; @@ -695,6 +697,7 @@ public abstract class EdgeSerializationTest { * edge types */ @Test + @Ignore("Kills embedded cassandra") public void testIteratorPaging() throws ConnectionException { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index d9857f2..93f0e41 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -42,6 +42,7 @@ import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.IndexFig; import org.apache.usergrid.persistence.index.IndexScope; import org.apache.usergrid.persistence.index.query.CandidateResult; +import org.apache.usergrid.persistence.index.utils.IndexValidationUtils; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.ArrayField; @@ -91,15 +92,20 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { private BulkRequestBuilder bulkRequest; + private final int autoFlushSize; - public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, - final Client client, final IndexFig config, final Set<String> knownTypes ) { + private int count; + + + public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, final Client client, final IndexFig config, + final Set<String> knownTypes, final int autoFlushSize ) { this.applicationScope = applicationScope; this.client = client; this.knownTypes = knownTypes; this.indexName = createIndexName( config.getIndexPrefix(), applicationScope ); this.refresh = config.isForcedRefresh(); + this.autoFlushSize = autoFlushSize; initBatch(); } @@ -107,6 +113,9 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { @Override public EntityIndexBatch index( final IndexScope indexScope, final Entity entity ) { + + IndexValidationUtils.validateScopeMatch( indexScope, applicationScope ); + final String indexType = createCollectionScopeTypeName( indexScope ); if ( log.isDebugEnabled() ) { @@ -118,7 +127,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { ValidationUtils.verifyEntityWrite( entity ); - initType(indexScope, indexType ); + initType( indexScope, indexType ); Map<String, Object> entityAsMap = entityToMap( entity ); @@ -135,12 +144,16 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { bulkRequest.add( client.prepareIndex( indexName, indexType, indexId ).setSource( entityAsMap ) ); + maybeFlush(); + return this; } @Override - public EntityIndexBatch deindex(final IndexScope indexScope, final Id id, final UUID version ) { + public EntityIndexBatch deindex( final IndexScope indexScope, final Id id, final UUID version ) { + + IndexValidationUtils.validateScopeMatch( indexScope, applicationScope ); final String indexType = createCollectionScopeTypeName( indexScope ); @@ -158,6 +171,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { log.debug( "Deindexed Entity with index id " + indexId ); + maybeFlush(); + return this; } @@ -165,14 +180,14 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { @Override public EntityIndexBatch deindex( final IndexScope indexScope, final Entity entity ) { - return deindex(indexScope, entity.getId(), entity.getVersion() ); + return deindex( indexScope, entity.getId(), entity.getVersion() ); } @Override public EntityIndexBatch deindex( final IndexScope indexScope, final CandidateResult entity ) { - return deindex(indexScope, entity.getId(), entity.getVersion() ); + return deindex( indexScope, entity.getId(), entity.getVersion() ); } @@ -184,7 +199,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { /** * Execute the request, check for errors, then re-init the batch for future use - * @param request */ private void execute( final BulkRequestBuilder request ) { final BulkResponse response = request.execute().actionGet(); @@ -199,7 +213,17 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { @Override public void executeAndRefresh() { - execute(bulkRequest.setRefresh( true ) ); + execute( bulkRequest.setRefresh( true ) ); + } + + + private void maybeFlush() { + count++; + + if ( count % autoFlushSize == 0 ) { + execute(); + count = 0; + } } @@ -217,6 +241,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { try { XContentBuilder mxcb = EsEntityIndexImpl.createDoubleStringIndexMapping( jsonBuilder(), typeName ); + + //TODO Dave can this be collapsed into the build as well? admin.indices().preparePutMapping( indexName ).setType( typeName ).setSource( mxcb ).execute().actionGet(); admin.indices().prepareGetMappings( indexName ).addTypes( typeName ).execute().actionGet(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 077036a..56316d8 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -134,7 +134,7 @@ public class EsEntityIndexImpl implements EntityIndex { @Override public EntityIndexBatch createBatch() { - return new EsEntityIndexBatchImpl( applicationScope, client, config, knownTypes ); + return new EsEntityIndexBatchImpl( applicationScope, client, config, knownTypes, 1000 ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java index 899e7b0..d6080de 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.index.utils; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.index.IndexScope; import com.google.common.base.Preconditions; @@ -50,4 +51,13 @@ public class IndexValidationUtils { } + /** + * Validate the scope in the index matches the application scope + * @param indexScope + * @param scope + */ + public static void validateScopeMatch(final IndexScope indexScope,final ApplicationScope scope){ + Preconditions.checkArgument( scope.equals( indexScope ) ); + } + }