Repository: usergrid Updated Branches: refs/heads/release-2.1.1 dfaf344a0 -> b8f502f9d
Fix issues with skipping entities to be indexed. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b8f502f9 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b8f502f9 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b8f502f9 Branch: refs/heads/release-2.1.1 Commit: b8f502f9d4f0e2776b4f5c28bec66ec09cb8e12a Parents: dfaf344 Author: Michael Russo <[email protected]> Authored: Mon Apr 18 13:40:49 2016 +0100 Committer: Michael Russo <[email protected]> Committed: Mon Apr 18 13:40:49 2016 +0100 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 2 +- .../corepersistence/CpRelationManager.java | 82 +++++++++++++++++--- .../corepersistence/index/IndexServiceImpl.java | 2 + .../service/ApplicationServiceImpl.java | 37 ++++++++- 4 files changed, 108 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/b8f502f9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 73f5d5a..b74995a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -781,7 +781,7 @@ public class CpEntityManager implements EntityManager { Preconditions.checkNotNull(entityRef, "entityRef cannot be null"); CpRelationManager relationManager = new CpRelationManager( managerCache, indexService, collectionService, - connectionService, this, entityManagerFig, applicationId, entityRef ); + connectionService, this, entityManagerFig, applicationId, indexSchemaCacheFactory, entityRef ); return relationManager; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b8f502f9/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 5596ab4..4082b45 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -19,7 +19,11 @@ package org.apache.usergrid.corepersistence; import java.util.*; +import org.apache.usergrid.corepersistence.index.IndexSchemaCache; +import org.apache.usergrid.corepersistence.index.IndexSchemaCacheFactory; import org.apache.usergrid.corepersistence.results.IdQueryExecutor; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.map.MapScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; @@ -75,13 +79,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createColle import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionEdge; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchByEdge; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType; -import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES; -import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY; -import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME; -import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE; -import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY; -import static org.apache.usergrid.persistence.Schema.TYPE_ROLE; -import static org.apache.usergrid.persistence.Schema.getDefaultSchema; +import static org.apache.usergrid.persistence.Schema.*; import static org.apache.usergrid.utils.ClassUtils.cast; import static org.apache.usergrid.utils.InflectionUtils.singularize; import static org.apache.usergrid.utils.MapUtils.addMapSet; @@ -109,6 +107,8 @@ public class CpRelationManager implements RelationManager { private final AsyncEventService indexService; + private final IndexSchemaCacheFactory indexSchemaCacheFactory; + private final CollectionService collectionService; private final ConnectionService connectionService; @@ -119,6 +119,7 @@ public class CpRelationManager implements RelationManager { final ConnectionService connectionService, final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId, + final IndexSchemaCacheFactory indexSchemaCacheFactory, final EntityRef headEntity) { @@ -158,6 +159,8 @@ public class CpRelationManager implements RelationManager { .format( "cpHeadEntity cannot be null for entity id %s, app id %s", entityId.getUuid(), applicationId ) ); this.indexService = indexService; + this.indexSchemaCacheFactory = indexSchemaCacheFactory; + } @@ -408,10 +411,20 @@ public class CpRelationManager implements RelationManager { //reverse return gm.writeEdge( reverseEdge ).doOnNext( reverseEdgeWritten -> { - indexService.queueNewEdge( applicationScope, cpHeadEntity, reverseEdge ); + + if ( !skipIndexingForType( cpHeadEntity.getId().getType() ) ) { + + indexService.queueNewEdge(applicationScope, cpHeadEntity, reverseEdge); + } + } ); } ).doOnCompleted( () -> { - indexService.queueNewEdge( applicationScope, memberEntity, edge ); + + if ( !skipIndexingForType( memberEntity.getId().getType() ) ) { + indexService.queueNewEdge(applicationScope, memberEntity, edge); + } + + if ( logger.isDebugEnabled() ) { logger.debug( "Added entity {}:{} to collection {}", itemRef.getUuid().toString(), itemRef.getType(), collectionName ); @@ -533,8 +546,10 @@ public class CpRelationManager implements RelationManager { //TODO: this should not happen here, needs to go to SQS //indexProducer.put(batch).subscribe(); - indexService.queueEntityDelete(applicationScope,memberEntity.getId()); + if ( !skipIndexingForType( memberEntity.getId().getType() ) ) { + indexService.queueEntityDelete(applicationScope, memberEntity.getId()); + } // special handling for roles collection of a group if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) { @@ -709,8 +724,11 @@ public class CpRelationManager implements RelationManager { gm.writeEdge(edge).toBlocking().lastOrDefault(null); //throw an exception if this fails - indexService.queueNewEdge( applicationScope, targetEntity, edge ); + if ( !skipIndexingForType( targetEntity.getId().getType() ) ) { + + indexService.queueNewEdge(applicationScope, targetEntity, edge); + } // remove any duplicate edges (keeps the duplicate edge with same timestamp) removeDuplicateEdgesAsync(gm, edge); @@ -787,7 +805,14 @@ public class CpRelationManager implements RelationManager { //delete all the edges and queue their processing gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ) - .doOnNext( returnedEdge -> indexService.queueDeleteEdge( applicationScope, returnedEdge ) ).toBlocking() + .doOnNext( returnedEdge -> { + + if ( !skipIndexingForType( returnedEdge.getSourceNode().getType() ) || !skipIndexingForType( returnedEdge.getTargetNode().getType() ) ) { + + indexService.queueDeleteEdge(applicationScope, returnedEdge); + } + + }).toBlocking() .lastOrDefault( null ); } @@ -1057,4 +1082,37 @@ public class CpRelationManager implements RelationManager { } + private boolean skipIndexingForType( String type ) { + + boolean skipIndexing = false; + + MapManager mm = getMapManagerForTypes(); + IndexSchemaCache indexSchemaCache = indexSchemaCacheFactory.getInstance( mm ); + String collectionName = Schema.defaultCollectionName( type ); + Optional<Map> collectionIndexingSchema = indexSchemaCache.getCollectionSchema( collectionName ); + + if ( collectionIndexingSchema.isPresent()) { + Map jsonMapData = collectionIndexingSchema.get(); + final ArrayList fields = (ArrayList) jsonMapData.get( "fields" ); + if ( fields.size() == 1 && fields.get(0).equals("none")) { + skipIndexing = true; + } + } + + return skipIndexing; + } + + /** + * Get the map manager for uuid mapping + */ + private MapManager getMapManagerForTypes() { + Id mapOwner = new SimpleId( applicationId, TYPE_APPLICATION ); + + final MapScope ms = CpNamingUtils.getEntityTypeMapScope(mapOwner); + + MapManager mm = managerCache.getMapManager( ms ); + + return mm; + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b8f502f9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index ad997c8..a33453e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -226,6 +226,8 @@ public class IndexServiceImpl implements IndexService { return Optional.absent(); } + fieldsToKeep.remove("none"); + defaultProperties.addAll( fieldsToKeep ); } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/b8f502f9/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java index 7c9fee3..ca2b9a2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java @@ -19,9 +19,12 @@ */ package org.apache.usergrid.corepersistence.service; +import com.google.common.base.Optional; import com.google.inject.Inject; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; +import org.apache.usergrid.corepersistence.index.IndexSchemaCache; +import org.apache.usergrid.corepersistence.index.IndexSchemaCacheFactory; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.Schema; @@ -39,6 +42,9 @@ import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.utils.InflectionUtils; import rx.Observable; +import java.util.ArrayList; +import java.util.Map; + import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createGraphOperationTimestamp; import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; @@ -53,6 +59,8 @@ public class ApplicationServiceImpl implements ApplicationService{ private final EventBuilder eventBuilder; private final MapManagerFactory mapManagerFactory; private final GraphManagerFactory graphManagerFactory; + private final IndexSchemaCacheFactory indexSchemaCacheFactory; + @Inject @@ -61,7 +69,8 @@ public class ApplicationServiceImpl implements ApplicationService{ AsyncEventService asyncEventService, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, - GraphManagerFactory graphManagerFactory + GraphManagerFactory graphManagerFactory, + IndexSchemaCacheFactory indexSchemaCacheFactory ){ this.allEntityIdsObservable = allEntityIdsObservable; @@ -70,6 +79,7 @@ public class ApplicationServiceImpl implements ApplicationService{ this.eventBuilder = eventBuilder; this.mapManagerFactory = mapManagerFactory; this.graphManagerFactory = graphManagerFactory; + this.indexSchemaCacheFactory = indexSchemaCacheFactory; } @@ -119,7 +129,11 @@ public class ApplicationServiceImpl implements ApplicationService{ private Id deleteAsync(MapManager mapManager, ApplicationScope applicationScope, Id entityId ) { try { //Step 4 && 5 - asyncEventService.queueEntityDelete(applicationScope, entityId); + + if ( !skipIndexingForType( entityId.getType(), applicationScope ) ) { + + asyncEventService.queueEntityDelete(applicationScope, entityId); + } //Step 6 //delete from our UUID index mapManager.delete(entityId.getUuid().toString()); @@ -130,6 +144,25 @@ public class ApplicationServiceImpl implements ApplicationService{ } + private boolean skipIndexingForType( String type, ApplicationScope applicationScope ) { + + boolean skipIndexing = false; + + MapManager mm = getMapManagerForTypes(applicationScope); + IndexSchemaCache indexSchemaCache = indexSchemaCacheFactory.getInstance( mm ); + String collectionName = Schema.defaultCollectionName( type ); + Optional<Map> collectionIndexingSchema = indexSchemaCache.getCollectionSchema( collectionName ); + + if ( collectionIndexingSchema.isPresent()) { + Map jsonMapData = collectionIndexingSchema.get(); + final ArrayList fields = (ArrayList) jsonMapData.get( "fields" ); + if ( fields.size() == 1 && fields.get(0).equals("none")) { + skipIndexing = true; + } + } + + return skipIndexing; + } /** * Get the map manager for uuid mapping
