Collection clear using version changes
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/24e443b2 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/24e443b2 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/24e443b2 Branch: refs/heads/collectionDelete Commit: 24e443b2f3e7968f5c24c1bf422ca5b9bfd16304 Parents: e4ccd5e Author: Mike Dunker <[email protected]> Authored: Thu Aug 17 09:00:25 2017 -0700 Committer: Mike Dunker <[email protected]> Committed: Thu Aug 17 09:00:25 2017 -0700 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 4 + .../corepersistence/CpEntityManager.java | 17 +- .../corepersistence/CpRelationManager.java | 2 +- .../asyncevents/AsyncEventService.java | 2 +- .../asyncevents/AsyncEventServiceImpl.java | 101 +++++++----- .../asyncevents/AsyncIndexProvider.java | 10 ++ .../CollectionClearTooSoonException.java | 39 +++++ .../CollectionDeleteTooSoonException.java | 39 ----- .../asyncevents/EventBuilder.java | 34 ++-- .../asyncevents/EventBuilderImpl.java | 154 ++++++++++++++----- .../asyncevents/model/AsyncEvent.java | 2 +- .../asyncevents/model/CollectionClearEvent.java | 58 +++++++ .../model/CollectionDeleteEvent.java | 58 ------- .../asyncevents/model/EntityDeleteEvent.java | 14 -- .../index/CollectionClearServiceImpl.java | 2 +- .../index/CollectionSettingsImpl.java | 6 +- .../index/CollectionVersionFig.java | 2 +- .../index/CollectionVersionManager.java | 4 +- .../index/CollectionVersionManagerImpl.java | 8 +- .../index/CollectionVersionUtil.java | 80 ---------- .../index/CollectionVersionUtils.java | 100 ++++++++++++ .../corepersistence/index/IndexServiceImpl.java | 3 + .../read/search/CandidateEntityFilter.java | 5 +- .../persistence/entities/Application.java | 34 ---- .../apache/usergrid/utils/InflectionUtils.java | 8 +- .../org/apache/usergrid/utils/StringUtils.java | 4 + .../index/AsyncEventServiceImplTest.java | 9 +- .../corepersistence/index/IndexServiceTest.java | 20 ++- .../usergrid/persistence/RebuildIndexTest.java | 4 +- .../graph/impl/GraphManagerImpl.java | 5 +- .../persistence/graph/impl/SimpleEdge.java | 10 ++ .../graph/impl/SimpleMarkedEdge.java | 19 ++- .../persistence/model/entity/Entity.java | 6 + .../usergrid/persistence/model/entity/Id.java | 6 + .../persistence/model/entity/SimpleId.java | 19 +++ .../persistence/model/util/CollectionUtils.java | 101 ++++++++++++ .../index/impl/DeIndexOperation.java | 6 + .../index/impl/EsEntityIndexBatchImpl.java | 9 +- .../persistence/index/impl/IndexEdgeImpl.java | 10 ++ .../persistence/index/impl/IndexOperation.java | 5 + .../persistence/index/impl/IndexingUtils.java | 63 +++++--- .../persistence/index/impl/SearchEdgeImpl.java | 9 ++ .../persistence/index/impl/EntityIndexTest.java | 108 +++++++++---- .../persistence/index/impl/GeoPagingTest.java | 7 +- .../index/impl/IndexingUtilsTest.java | 13 +- .../queue/src/test/resources/qakka.properties | 2 +- .../rest/applications/CollectionResource.java | 6 +- .../rest/applications/ServiceResource.java | 5 +- .../CollectionClearTooSoonExceptionMapper.java | 44 ++++++ .../CollectionDeleteTooSoonExceptionMapper.java | 44 ------ .../usergrid/rest/system/IndexResource.java | 3 + .../collection/CollectionClearTest.java | 121 +++++++++++---- .../services/AbstractConnectionsService.java | 10 ++ .../usergrid/services/ServiceManager.java | 81 +++++++--- 54 files changed, 1015 insertions(+), 520 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index 6a93af5..ba1a924 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -41,7 +41,9 @@ import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.guice.GraphModule; +import org.apache.usergrid.persistence.graph.serialization.impl.GraphManagerFactoryImpl; import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode; import org.apache.usergrid.persistence.index.guice.IndexModule; import org.apache.usergrid.persistence.token.guice.TokenModule; @@ -183,6 +185,8 @@ public class CoreModule extends AbstractModule { bind( ApplicationService.class ).to( ApplicationServiceImpl.class ); bind( StatusService.class ).to( StatusServiceImpl.class ); + + bind( GraphManagerFactory.class ).to(GraphManagerFactoryImpl.class); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index e192939..9435ed6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -47,6 +47,7 @@ import org.apache.usergrid.persistence.collection.FieldSet; import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.util.CollectionUtils; import org.apache.usergrid.persistence.entities.*; import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException; import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; @@ -741,9 +742,9 @@ public class CpEntityManager implements EntityManager { Set<String> existingCollections = new HashSet<>(); for (String existingCollection : getRelationManager( getApplication() ).getCollections()) { - if (Application.isCustomCollectionName(existingCollection)) { + if (CollectionUtils.isCustomCollectionOrEntityName(existingCollection)) { // check for correct version - VersionedCollectionName v = CollectionVersionUtil.parseVersionedName(existingCollection); + VersionedCollectionName v = CollectionVersionUtils.parseVersionedName(existingCollection); CollectionVersionManager cvm = collectionVersionManagerFactory.getInstance( new CollectionScopeImpl(getApplication().asId(), v.getCollectionName()) ); @@ -784,7 +785,7 @@ public class CpEntityManager implements EntityManager { if ( !Schema.isAssociatedEntityType( collectionName ) ) { Long count = counts.get( APPLICATION_COLLECTION + collectionName ); - String unversionedCollectionName = CollectionVersionUtil.getBaseCollectionName(collectionName); + String unversionedCollectionName = CollectionVersionUtils.getBaseCollectionName(collectionName); Map<String, Object> entry = new HashMap<String, Object>(); entry.put( "count", count != null ? count : 0 ); entry.put( "type", singularize( unversionedCollectionName ) ); @@ -825,7 +826,7 @@ public class CpEntityManager implements EntityManager { StringField uniqueLookupRepairField = new StringField( propertyName, aliasType.toString()); Observable<FieldSet> fieldSetObservable = ecm.getEntitiesFromFields( - Inflector.getInstance().singularize( collectionType ), Arrays.<Field>asList( uniqueLookupRepairField ), uniqueIndexRepair); + singularize( collectionType ), Arrays.<Field>asList( uniqueLookupRepairField ), uniqueIndexRepair); if(fieldSetObservable == null){ @@ -849,7 +850,7 @@ public class CpEntityManager implements EntityManager { } fieldSet = ecm.getEntitiesFromFields( - Inflector.getInstance().singularize( collectionType ), + singularize( collectionType ), Collections.singletonList(uniqueLookupRepairField), uniqueIndexRepair).toBlocking().last(); } @@ -870,7 +871,7 @@ public class CpEntityManager implements EntityManager { StringField uniqueLookupRepairField = new StringField( propertyName, aliasType); Observable<FieldSet> fieldSetObservable = ecm.getEntitiesFromFields( - Inflector.getInstance().singularize( collectionType ), + singularize( collectionType ), Collections.singletonList(uniqueLookupRepairField), uniqueIndexRepair); if(fieldSetObservable == null){ @@ -895,7 +896,7 @@ public class CpEntityManager implements EntityManager { } fieldSet = ecm.getEntitiesFromFields( - Inflector.getInstance().singularize( collectionType ), + singularize( collectionType ), Collections.singletonList(uniqueLookupRepairField), uniqueIndexRepair).toBlocking().last(); } @@ -2491,7 +2492,7 @@ public class CpEntityManager implements EntityManager { final Object propertyValue ) { //convert to a string, that's what we store - final Id results = ecm.getIdField( Inflector.getInstance().singularize( collectionName ), new StringField( + final Id results = ecm.getIdField( singularize( collectionName ), new StringField( propertyName, propertyValue.toString() ) ).toBlocking() .lastOrDefault( null ); return results; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index c02ca7d..a8b309c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -491,7 +491,7 @@ public class CpRelationManager implements RelationManager { return; } } - // handles normal app collection deletes + // handles normal app collection delete em.delete( itemRef ); return; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index 7ce208f..917ad5b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -101,7 +101,7 @@ public interface AsyncEventService extends ReIndexAction { * @param collectionScope * @param collectionVersion */ - void queueCollectionDelete(final CollectionScope collectionScope, final String collectionVersion); + void queueCollectionClear(final CollectionScope collectionScope, final String collectionVersion); /** * current queue depth http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index e33865e..883b784 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -41,6 +41,11 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.impl.SimpleEdge; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.util.CollectionUtils; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexFactory; @@ -58,6 +63,7 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.queue.*; import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl; +import org.apache.usergrid.corepersistence.index.CollectionVersionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -74,8 +80,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.commons.lang.StringUtils.isNotEmpty; - /** * TODO, this whole class is becoming a nightmare. @@ -120,9 +124,11 @@ public class AsyncEventServiceImpl implements AsyncEventService { private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final IndexLocationStrategyFactory indexLocationStrategyFactory; private final EntityIndexFactory entityIndexFactory; + private final CollectionVersionManagerFactory collectionVersionManagerFactory; private final EventBuilder eventBuilder; private final RxTaskScheduler rxTaskScheduler; private final AllEntityIdsObservable allEntityIdsObservable; + private final GraphManagerFactory graphManagerFactory; private final Timer readTimer; private final Timer writeTimer; @@ -160,6 +166,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { final MapManagerFactory mapManagerFactory, final LegacyQueueFig queueFig, final CollectionVersionFig collectionVersionFig, + final CollectionVersionManagerFactory collectionVersionManagerFactory, + final GraphManagerFactory graphManagerFactory, final AllEntityIdsObservable allEntityIdsObservable, @EventExecutionScheduler final RxTaskScheduler rxTaskScheduler ) { @@ -204,6 +212,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { this.indexProcessorFig = indexProcessorFig; this.queueFig = queueFig; this.collectionVersionFig = collectionVersionFig; + this.collectionVersionManagerFactory = collectionVersionManagerFactory; + this.graphManagerFactory = graphManagerFactory; this.allEntityIdsObservable = allEntityIdsObservable; this.writeTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.write"); @@ -471,9 +481,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event); - } else if (event instanceof CollectionDeleteEvent) { + } else if (event instanceof CollectionClearEvent) { - handleCollectionDelete(message); + handleCollectionClear(message); } else { @@ -483,7 +493,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { if( !(event instanceof ElasticsearchIndexEvent) && !(event instanceof InitializeApplicationIndexEvent) - && !(event instanceof CollectionDeleteEvent) + && !(event instanceof CollectionClearEvent) && single.isEmpty() ){ logger.warn("No index operation messages came back from event processing for eventType: {}, msgId: {}, msgBody: {}", event.getClass().getSimpleName(), message.getMessageId(), message.getStringBody()); @@ -542,8 +552,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { final Entity entity, long updatedAfter) { - logger.trace("Offering EntityIndexEvent for {}:{}", - entity.getId().getUuid(), entity.getId().getType()); + if (logger.isTraceEnabled()) { + logger.trace("Offering EntityIndexEvent for {}:{}", + entity.getId().getUuid(), entity.getId().getType()); + } offer(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(applicationScope, entity.getId()), updatedAfter)); @@ -569,6 +581,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); final Id entityId = entityIdScope.getId(); final long updatedAfter = entityIndexEvent.getUpdatedAfter(); + if (logger.isTraceEnabled()) { + logger.trace("handleEntityIndexUpdate entityId={}, type={}", entityId, entityId.getType()); + } final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter); @@ -584,8 +599,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { final Entity entity, final Edge newEdge) { - logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}", - newEdge.getType(), entity.getId().getUuid(), entity.getId().getType()); + if (logger.isTraceEnabled()) { + logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}", + newEdge.getType(), entity.getId().getUuid(), entity.getId().getType()); + } offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge )); @@ -602,6 +619,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass())); final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event; + if (logger.isTraceEnabled()) { + logger.trace("handleEdgeIndex entityId={} targetNode={}", edgeIndexEvent.getEntityId(), edgeIndexEvent.getEdge().getTargetNode()); + } final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() ); @@ -682,7 +702,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { //send to the topic so all regions index the batch - logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId ); + if (logger.isTraceEnabled()) { + logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId); + } offerTopic( elasticsearchIndexEvent, queueType ); } @@ -839,88 +861,95 @@ public class AsyncEventServiceImpl implements AsyncEventService { final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event; final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope(); final Id entityId = entityDeleteEvent.getEntityIdScope().getId(); - final boolean markedOnly = entityDeleteEvent.markedOnly(); if (logger.isDebugEnabled()) { logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); } - final IndexOperationMessage indexOperationMessage = markedOnly ? - eventBuilder.buildEntityDelete( applicationScope, entityId ) : - eventBuilder.buildEntityDeleteAllVersions( applicationScope, entityId ); + final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete( applicationScope, entityId ); return indexOperationMessage; } @Override - public void queueCollectionDelete(final CollectionScope collectionScope, final String collectionVersion) { + public void queueCollectionClear(final CollectionScope collectionScope, final String collectionVersion) { if (logger.isDebugEnabled()) { - logger.debug("Offering CollectionDeleteEvent for application={}, collectionName={}, collectionVersion={}", + logger.debug("Offering CollectionClearEvent for application={}, collectionName={}, collectionVersion={}", collectionScope.getApplication().getUuid(), collectionScope.getCollectionName(), collectionVersion); } // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op - offer(new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), + offer(new CollectionClearEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), AsyncEventQueueType.DELETE); } - private void handleCollectionDelete(final LegacyQueueMessage message) { + private void handleCollectionClear(final LegacyQueueMessage message) { - Preconditions.checkNotNull(message, "Queue Message cannot be null for handleCollectionDelete"); + Preconditions.checkNotNull(message, "Queue Message cannot be null for handleCollectionClear"); final AsyncEvent event = (AsyncEvent) message.getBody(); - Preconditions.checkNotNull(event, "QueueMessage body cannot be null for handleCollectionDelete" ); - Preconditions.checkArgument( event instanceof CollectionDeleteEvent, - String.format( "Event Type for handleCollectionDelete must be COLLECTION_DELETE, got %s", event.getClass() ) ); + Preconditions.checkNotNull(event, "QueueMessage body cannot be null for handleCollectionClear" ); + Preconditions.checkArgument( event instanceof CollectionClearEvent, + String.format( "Event Type for handleCollectionClear must be COLLECTION_CLEAR, got %s", event.getClass() ) ); - final CollectionDeleteEvent collectionDeleteEvent = ( CollectionDeleteEvent ) event; - final CollectionScope collectionScope = collectionDeleteEvent.getCollectionScope(); + final CollectionClearEvent collectionClearEvent = (CollectionClearEvent) event; + final CollectionScope collectionScope = collectionClearEvent.getCollectionScope(); if (collectionScope == null) { - logger.error("CollectionDeleteEvent received with null collectionScope"); + logger.error("CollectionClearEvent received with null collectionScope"); // ack message, nothing more to do return; } final UUID applicationID = collectionScope.getApplication().getUuid(); if (applicationID == null) { - logger.error("CollectionDeleteEvent collectionScope has null application"); + logger.error("CollectionClearEvent collectionScope has null application"); // ack message, nothing more to do return; } - String collectionVersion = collectionDeleteEvent.getCollectionVersion(); + String collectionVersion = collectionClearEvent.getCollectionVersion(); if (collectionVersion == null) { collectionVersion = ""; } final ApplicationScope applicationScope = CpNamingUtils.getApplicationScope(applicationID); + final String versionedCollectionName = - CollectionVersionUtil.buildVersionedNameString(collectionScope.getCollectionName(), - collectionVersion, false); + CollectionVersionUtils.buildVersionedNameString(collectionScope.getCollectionName(), + collectionVersion, false, false); + logger.info("collectionClear: versionedCollectionName:{}", versionedCollectionName); + final EntityCollectionManager ecm = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + final GraphManager gm = + graphManagerFactory.createEdgeManager(applicationScope); final AtomicInteger count = new AtomicInteger(); int maxDeletes = collectionVersionFig.getDeletesPerEvent(); if (logger.isDebugEnabled()) { - logger.debug("handleCollectionDelete: applicationScope={} collectionName={} maxDeletes={}", applicationScope.toString(), versionedCollectionName, maxDeletes); + logger.debug("handleCollectionClear: applicationScope={} collectionName={} maxDeletes={}", applicationScope.toString(), versionedCollectionName, maxDeletes); } allEntityIdsObservable.getEdgesToEntities(Observable.just(applicationScope), Optional.fromNullable(CpNamingUtils.getEdgeTypeFromCollectionName(versionedCollectionName.toLowerCase())), Optional.absent()) //.takeWhile(edgeScope-> count.intValue() < maxDeletes) .take(maxDeletes) + .doOnNext(edgeScope -> { + // mark the entity for deletion + ecm.mark( edgeScope.getEdge().getTargetNode(), null ).mergeWith( gm.markNode( edgeScope.getEdge().getTargetNode(), CpNamingUtils.createGraphOperationTimestamp() ) ).toBlocking().last(); + }) .doOnNext(edgeScope-> { + //logger.info("edgeScope sourceNode:{} targetNode:{} type:{}", edgeScope.getEdge().getSourceNode().toString(), edgeScope.getEdge().getTargetNode().toString(), edgeScope.getEdge().getType()); - offer(new EntityDeleteEvent(queueFig.getPrimaryRegion(), - new EntityIdScope(applicationScope, edgeScope.getEdge().getTargetNode()),false), - AsyncEventQueueType.DELETE); + queueEntityDelete(applicationScope, edgeScope.getEdge().getTargetNode()); count.incrementAndGet(); }).toBlocking().lastOrDefault(null); - logger.info("handleCollectionDelete: queued {} entity deletes for deleted collection", count.intValue()); + logger.info("handleCollectionClear: queued {} entity deletes for cleared collection", count.intValue()); if (count.intValue() >= maxDeletes) { - // requeue collection delete for next chunk of deletes - offer (new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), + // requeue collection clear for next chunk + offer (new CollectionClearEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), AsyncEventQueueType.DELETE); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 31fcd6d..ccfb574 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -21,12 +21,14 @@ package org.apache.usergrid.corepersistence.asyncevents; import org.apache.usergrid.corepersistence.index.CollectionVersionFig; +import org.apache.usergrid.corepersistence.index.CollectionVersionManagerFactory; import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.IndexProducer; import org.apache.usergrid.persistence.queue.LegacyQueueManager; @@ -61,6 +63,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { private final MapManagerFactory mapManagerFactory; private final LegacyQueueFig queueFig; private final CollectionVersionFig collectionVersionFig; + private final CollectionVersionManagerFactory collectionVersionManagerFactory; + private final GraphManagerFactory graphManagerFactory; private final AllEntityIdsObservable allEntityIdsObservable; private AsyncEventService asyncEventService; @@ -79,6 +83,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { final MapManagerFactory mapManagerFactory, final LegacyQueueFig queueFig, final CollectionVersionFig collectionVersionFig, + final CollectionVersionManagerFactory collectionVersionManagerFactory, + final GraphManagerFactory graphManagerFactory, final AllEntityIdsObservable allEntityIdsObservable) { this.indexProcessorFig = indexProcessorFig; @@ -93,6 +99,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { this.mapManagerFactory = mapManagerFactory; this.queueFig = queueFig; this.collectionVersionFig = collectionVersionFig; + this.collectionVersionManagerFactory = collectionVersionManagerFactory; + this.graphManagerFactory = graphManagerFactory; this.allEntityIdsObservable = allEntityIdsObservable; } @@ -125,6 +133,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { mapManagerFactory, queueFig, collectionVersionFig, + collectionVersionManagerFactory, + graphManagerFactory, allEntityIdsObservable, rxTaskScheduler ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionClearTooSoonException.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionClearTooSoonException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionClearTooSoonException.java new file mode 100644 index 0000000..314dcfc --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionClearTooSoonException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + +public class CollectionClearTooSoonException extends RuntimeException { + + private final long timeLastDeleted; + private final long timeRequiredBeforeDeleteMsec; + + public CollectionClearTooSoonException(final long timeLastDeleted, final long timeRequiredBeforeDeleteMsec) { + this.timeLastDeleted = timeLastDeleted; + this.timeRequiredBeforeDeleteMsec = timeRequiredBeforeDeleteMsec; + } + + public long getTimeLastDeleted() { + return timeLastDeleted; + } + + public long getTimeRequiredBeforeDeleteMsec() { + return timeRequiredBeforeDeleteMsec; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java deleted file mode 100644 index bd37d46..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.asyncevents; - -public class CollectionDeleteTooSoonException extends RuntimeException { - - private final long timeLastDeleted; - private final long timeRequiredBeforeDeleteMsec; - - public CollectionDeleteTooSoonException(final long timeLastDeleted, final long timeRequiredBeforeDeleteMsec) { - this.timeLastDeleted = timeLastDeleted; - this.timeRequiredBeforeDeleteMsec = timeRequiredBeforeDeleteMsec; - } - - public long getTimeLastDeleted() { - return timeLastDeleted; - } - - public long getTimeRequiredBeforeDeleteMsec() { - return timeRequiredBeforeDeleteMsec; - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java index 081b3bc..7c68521 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.MarkedEdge; +import org.apache.usergrid.persistence.index.SearchEdge; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -65,15 +66,6 @@ public interface EventBuilder { */ IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId ); - /** - * Return a bin with 2 observable streams for entity delete. This deletes all versions -- used only for an old - * collection version. Does not require versions to be marked for deletion. - * @param applicationScope - * @param entityId - * @return - */ - IndexOperationMessage buildEntityDeleteAllVersions(ApplicationScope applicationScope, Id entityId ); - /** @@ -95,6 +87,30 @@ public interface EventBuilder { Id entityId, UUID markedVersion ); /** + * Get id that includes collection version. + */ + Id getCollectionVersionedId(ApplicationScope applicationScope, Id id, boolean forceVersion); + + + /** + * Get entity that includes collection version. + */ + Entity getCollectionVersionedEntity(ApplicationScope applicationScope, Entity entity, boolean forceVersion); + + + /** + * Get edge that includes collection version. + */ + Edge getCollectionVersionedEdge(ApplicationScope applicationScope, Edge edge, boolean forceVersion); + + + /** + * Get edge that includes collection version. + */ + SearchEdge getCollectionVersionedSearchEdge(ApplicationScope applicationScope, SearchEdge searchEdge, boolean forceVersion); + + + /** * A bean to hold both our observables so the caller can choose the subscription mechanism. Note that * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesDeleted is subscribed */ http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index ade6818..049cd4e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -24,12 +24,22 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.apache.usergrid.corepersistence.index.*; +import org.apache.usergrid.persistence.model.util.CollectionUtils; +import org.apache.usergrid.persistence.graph.MarkedEdge; +import org.apache.usergrid.persistence.graph.impl.SimpleEdge; +import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; +import org.apache.usergrid.persistence.index.IndexEdge; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.impl.IndexEdgeImpl; +import org.apache.usergrid.persistence.index.impl.SearchEdgeImpl; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.corepersistence.index.CollectionVersionUtils; +import org.apache.usergrid.utils.InflectionUtils; import org.apache.usergrid.utils.UUIDUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.corepersistence.index.EntityIndexOperation; -import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.persistence.Schema; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; @@ -62,19 +72,93 @@ public class EventBuilderImpl implements EventBuilder { private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final GraphManagerFactory graphManagerFactory; private final SerializationFig serializationFig; + private final CollectionVersionManagerFactory collectionVersionManagerFactory; @Inject public EventBuilderImpl( final IndexService indexService, final EntityCollectionManagerFactory entityCollectionManagerFactory, - final GraphManagerFactory graphManagerFactory, final SerializationFig serializationFig ) { + final GraphManagerFactory graphManagerFactory, final SerializationFig serializationFig, + final CollectionVersionManagerFactory collectionVersionManagerFactory) { this.indexService = indexService; this.entityCollectionManagerFactory = entityCollectionManagerFactory; this.graphManagerFactory = graphManagerFactory; this.serializationFig = serializationFig; + this.collectionVersionManagerFactory = collectionVersionManagerFactory; } + @Override + public Id getCollectionVersionedId(ApplicationScope applicationScope, Id id, boolean forceVersion) { + + String currentCollectionName = InflectionUtils.pluralize(id.getType()); + + // if already versioned, or not a custom (versionable) collection, we're done + if (!CollectionUtils.isCustomCollectionOrEntityName(currentCollectionName) || + CollectionVersionUtils.isVersionedName(currentCollectionName)) { + return id; + } + + CollectionVersionManager cvm = collectionVersionManagerFactory.getInstance( + new CollectionScopeImpl(applicationScope.getApplication(), currentCollectionName) + ); + String currentCollectionVersion = cvm.getCollectionVersion(true); + + String newEntityType = CollectionVersionUtils.buildVersionedNameString(id.getType(), currentCollectionVersion, + false, forceVersion); + + return new SimpleId(id.getUuid(), newEntityType); + } + + @Override + public Entity getCollectionVersionedEntity(final ApplicationScope applicationScope, final Entity entity, boolean forceVersion) { + + return new Entity(getCollectionVersionedId(applicationScope, entity.getId(), forceVersion), entity.getVersion() ); + + } + + @Override + public Edge getCollectionVersionedEdge(final ApplicationScope applicationScope, final Edge edge, boolean forceVersion) { + Edge returnEdge; + if (edge instanceof SimpleMarkedEdge) { + MarkedEdge markedEdge = (MarkedEdge)edge; + returnEdge = new SimpleMarkedEdge( + getCollectionVersionedId(applicationScope, markedEdge.getSourceNode(), forceVersion), + markedEdge.getType(), + getCollectionVersionedId(applicationScope, markedEdge.getTargetNode(), forceVersion), + markedEdge.getTimestamp(), + markedEdge.isDeleted(), + markedEdge.isSourceNodeDelete(), + markedEdge.isTargetNodeDeleted() + ); + } else { // SimpleEdge + returnEdge = new SimpleEdge(getCollectionVersionedId(applicationScope, edge.getSourceNode(), forceVersion), edge.getType(), + getCollectionVersionedId(applicationScope, edge.getTargetNode(), forceVersion), edge.getTimestamp()); + } + + return returnEdge; + } + + @Override + public SearchEdge getCollectionVersionedSearchEdge(final ApplicationScope applicationScope, final SearchEdge searchEdge, boolean forceVersion) { + SearchEdge returnSearchEdge; + if (searchEdge instanceof IndexEdgeImpl) { + IndexEdge indexEdge = (IndexEdge)searchEdge; + returnSearchEdge = new IndexEdgeImpl( + getCollectionVersionedId(applicationScope, indexEdge.getNodeId(), forceVersion), + indexEdge.getEdgeName(), + indexEdge.getNodeType(), + indexEdge.getTimestamp() + ); + } else { // SearchEdgeImpl + returnSearchEdge = new SearchEdgeImpl(getCollectionVersionedId(applicationScope, searchEdge.getNodeId(), forceVersion), + searchEdge.getEdgeName(), searchEdge.getNodeType()); + + } + + return returnSearchEdge; + } + @Override public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity, @@ -126,45 +210,46 @@ public class EventBuilderImpl implements EventBuilder { //Does the queue entityDelete mark the entity then immediately does to the deleteEntityIndex. seems like //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter? - private IndexOperationMessage buildEntityDeleteCommon(final ApplicationScope applicationScope, final Id entityId, - boolean markedOnly) { + @Override + public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) { if (logger.isDebugEnabled()) { - logger.debug("Deleting entity id ({} versions) from index in app scope {} with entityId {}", - markedOnly ? "marked" : "all", applicationScope, entityId); + logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}", + applicationScope, entityId); } final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); - //TODO USERGRID-1123: Implement so we don't iterate logs twice (latest DELETED version, then to get all DELETED) - - MvccLogEntry mostRecentToDelete = markedOnly ? - ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking() - .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED ) : - ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking() - .firstOrDefault( null ); + MvccLogEntry mostRecentToDelete = + ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ) + .toBlocking() + .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED ); +// logger.info("mostRecent stage={} entityId={} version={} state={}", +// mostRecentToDelete.getStage().name(), mostRecentToDelete.getEntityId(), +// mostRecentToDelete.getVersion().toString(), mostRecentToDelete.getState().name()); - // if only marked entities should be deleted and nothing is marked, then abort - if(markedOnly && mostRecentToDelete == null){ + if (mostRecentToDelete == null) { + logger.info("No entity versions to delete for id {}", entityId.toString()); + } + // if nothing is marked, then abort + if(mostRecentToDelete == null){ return new IndexOperationMessage(); } final List<MvccLogEntry> logEntries = new ArrayList<>(); Observable<MvccLogEntry> mvccLogEntryListObservable = ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ); - if(markedOnly){ - mvccLogEntryListObservable - .filter(mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED); - } - mvccLogEntryListObservable - .filter( mvccLogEntry-> mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() ) - .buffer( serializationFig.getBufferSize() ) - .doOnNext( buffer -> ecm.delete( buffer ) ) - .doOnNext(mvccLogEntries -> { - logEntries.addAll(mvccLogEntries); - }).toBlocking().lastOrDefault(null); + mvccLogEntryListObservable + .filter( mvccLogEntry-> mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() ) + .buffer( serializationFig.getBufferSize() ) + .doOnNext( buffer -> ecm.delete( buffer ) ) + .doOnNext(mvccLogEntries -> { + logEntries.addAll(mvccLogEntries); + }).toBlocking().lastOrDefault(null); + + //logger.info("logEntries size={}", logEntries.size()); IndexOperationMessage combined = new IndexOperationMessage(); @@ -186,7 +271,9 @@ public class EventBuilderImpl implements EventBuilder { // Further comments using the example of deleting "server1" from the above example. gm.compactNode(entityId).doOnNext(markedEdge -> { - logger.debug("Processing deleted edge for de-indexing {}", markedEdge); + if (logger.isDebugEnabled()) { + logger.debug("Processing deleted edge for de-indexing {}", markedEdge); + } // if the edge was for a connection where the entity to be deleted is the source node, we need to load // the target node's versions so that all versions of connections to that entity can be de-indexed @@ -223,17 +310,6 @@ public class EventBuilderImpl implements EventBuilder { } @Override - public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) { - return buildEntityDeleteCommon(applicationScope, entityId, true); - } - - // this deletes all versions of an entity, only used for collection delete - @Override - public IndexOperationMessage buildEntityDeleteAllVersions(final ApplicationScope applicationScope, final Id entityId ) { - return buildEntityDeleteCommon(applicationScope, entityId, false); - } - - @Override public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) { final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java index 0ea0fdc..9e444f3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java @@ -45,7 +45,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ), @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" ), @JsonSubTypes.Type( value = DeIndexOldVersionsEvent.class, name = "deIndexOldVersionsEvent" ), - @JsonSubTypes.Type( value = CollectionDeleteEvent.class, name = "collectionDeleteEvent" ) + @JsonSubTypes.Type( value = CollectionClearEvent.class, name = "collectionDeleteEvent" ) } ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionClearEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionClearEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionClearEvent.java new file mode 100644 index 0000000..8dd7d97 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionClearEvent.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.usergrid.corepersistence.index.CollectionScope; + +/** + * Event that will signal to queue up entity deletes for a collection clear. + */ +public final class CollectionClearEvent extends AsyncEvent { + + + @JsonProperty + protected CollectionScope collectionScope; + + @JsonProperty + protected String collectionVersion; + + /** + * Do not delete! Needed for Jackson + */ + @SuppressWarnings( "unused" ) + public CollectionClearEvent() { + super(); + } + + public CollectionClearEvent(String sourceRegion, CollectionScope collectionScope, String collectionVersion) { + super(sourceRegion); + this.collectionScope = collectionScope; + this.collectionVersion = collectionVersion; + } + + public CollectionScope getCollectionScope() { + return collectionScope; + } + + public String getCollectionVersion() { + return collectionVersion; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java deleted file mode 100644 index 9fc978c..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.asyncevents.model; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.usergrid.corepersistence.index.CollectionScope; - -/** - * Event that will signal to queue up entity deletes for a collection delete. - */ -public final class CollectionDeleteEvent extends AsyncEvent { - - - @JsonProperty - protected CollectionScope collectionScope; - - @JsonProperty - protected String collectionVersion; - - /** - * Do not delete! Needed for Jackson - */ - @SuppressWarnings( "unused" ) - public CollectionDeleteEvent() { - super(); - } - - public CollectionDeleteEvent(String sourceRegion, CollectionScope collectionScope, String collectionVersion) { - super(sourceRegion); - this.collectionScope = collectionScope; - this.collectionVersion = collectionVersion; - } - - public CollectionScope getCollectionScope() { - return collectionScope; - } - - public String getCollectionVersion() { - return collectionVersion; - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java index aa6a15b..db13290 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java @@ -27,33 +27,19 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E */ public final class EntityDeleteEvent extends AsyncEvent { - @JsonProperty protected EntityIdScope entityIdScope; - @JsonProperty - protected boolean markedOnly; - public EntityDeleteEvent() { super(); } public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope) { - this(sourceRegion, entityIdScope, true); - } - - public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope, boolean markedOnly) { super(sourceRegion); this.entityIdScope = entityIdScope; - this.markedOnly = markedOnly; } - public EntityIdScope getEntityIdScope() { return entityIdScope; } - - public boolean markedOnly() { - return markedOnly; - } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java index ff64d6a..2717d85 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java @@ -53,7 +53,7 @@ public class CollectionClearServiceImpl implements CollectionClearService { applicationID.toString(), baseCollectionName, oldVersion, collectionVersionManager.getCollectionVersion(false)); // queue up delete of old version entities - asyncEventService.queueCollectionDelete(scope, oldVersion); + asyncEventService.queueCollectionClear(scope, oldVersion); } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java index 74acd09..5e71cec 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java @@ -51,7 +51,7 @@ public class CollectionSettingsImpl implements CollectionSettings { public Optional<Map<String, Object>> getCollectionSettings(final String collectionName ) { // collectionName may be a versioned collection name -- get the base name - String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName(); + String baseCollectionName = CollectionVersionUtils.parseVersionedName(collectionName).getCollectionName(); String settings; @@ -80,7 +80,7 @@ public class CollectionSettingsImpl implements CollectionSettings { public void putCollectionSettings(final String collectionName, final String collectionSchema ){ // collectionName may be a versioned collection name -- get the base name - String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName(); + String baseCollectionName = CollectionVersionUtils.parseVersionedName(collectionName).getCollectionName(); mapManager.putString( baseCollectionName, collectionSchema ); cache.put(scope, collectionSchema); @@ -91,7 +91,7 @@ public class CollectionSettingsImpl implements CollectionSettings { public void deleteCollectionSettings(final String collectionName){ // collectionName may be a versioned collection name -- get the base name - String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName(); + String baseCollectionName = CollectionVersionUtils.parseVersionedName(collectionName).getCollectionName(); mapManager.delete( baseCollectionName ); cache.invalidate( scope ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java index 3bb75c7..43ad3bb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java @@ -43,7 +43,7 @@ public interface CollectionVersionFig extends GuicyFig { int getCacheTimeout(); @Key(TIME_BETWEEN_DELETES_MS) - @Default("60000") + @Default("0") long getTimeBetweenDeletes(); @Key(DELETES_PER_EVENT) http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java index 9768a55..c3cc1ca 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java @@ -18,7 +18,7 @@ package org.apache.usergrid.corepersistence.index; -import org.apache.usergrid.corepersistence.asyncevents.CollectionDeleteTooSoonException; +import org.apache.usergrid.corepersistence.asyncevents.CollectionClearTooSoonException; public interface CollectionVersionManager { @@ -29,7 +29,7 @@ public interface CollectionVersionManager { String getVersionedCollectionName(final boolean bypassCache); - String updateCollectionVersion() throws CollectionDeleteTooSoonException; + String updateCollectionVersion() throws CollectionClearTooSoonException; Long getTimeLastChanged(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java index c5bb417..b467242 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java @@ -18,7 +18,7 @@ package org.apache.usergrid.corepersistence.index; import com.google.inject.Inject; -import org.apache.usergrid.corepersistence.asyncevents.CollectionDeleteTooSoonException; +import org.apache.usergrid.corepersistence.asyncevents.CollectionClearTooSoonException; import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.slf4j.Logger; @@ -78,19 +78,19 @@ public class CollectionVersionManagerImpl implements CollectionVersionManager { @Override public String getVersionedCollectionName(final boolean bypassCache) { String collectionVersion = getCollectionVersion(bypassCache); - return CollectionVersionUtil.buildVersionedNameString(collectionName, collectionVersion, false); + return CollectionVersionUtils.buildVersionedNameString(collectionName, collectionVersion, false); } // returns old collection version @Override - public String updateCollectionVersion() throws CollectionDeleteTooSoonException { + public String updateCollectionVersion() throws CollectionClearTooSoonException { // check for time last changed Long timeLastChanged = getTimeLastChanged(); long timeBetweenDeletes = collectionVersionFig.getTimeBetweenDeletes(); if (timeLastChanged != null) { if (System.currentTimeMillis() - timeLastChanged < timeBetweenDeletes) { // too soon - throw new CollectionDeleteTooSoonException(timeLastChanged, timeBetweenDeletes); + throw new CollectionClearTooSoonException(timeLastChanged, timeBetweenDeletes); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java deleted file mode 100644 index 46e4e09..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import com.amazonaws.util.StringUtils; -import com.google.common.base.Preconditions; - -import java.util.regex.Pattern; - -public class CollectionVersionUtil { - private static final String VERSIONED_NAME_SEPARATOR = "%~!~%"; - - public static VersionedCollectionName parseVersionedName(String versionedCollectionNameString) throws IllegalArgumentException { - Preconditions.checkNotNull(versionedCollectionNameString, "collection name string is required"); - String collectionName; - String collectionVersion; - try { - String[] parts = versionedCollectionNameString.split(Pattern.quote(VERSIONED_NAME_SEPARATOR)); - if (parts.length == 2) { - collectionName = parts[0]; - collectionVersion = parts[1]; - } else if (parts.length == 1) { - collectionName = parts[0]; - collectionVersion = ""; - } else { - throw new IllegalArgumentException("Invalid format for versioned collection, versionedCollectionNameString=" + versionedCollectionNameString); - } - } catch (Exception e) { - throw new IllegalArgumentException("Unable to parse versioned collection, versionedCollectionNameString=" + versionedCollectionNameString, e); - } - return new VersionedCollectionNameImpl(collectionName, collectionVersion); - } - - public static String getBaseCollectionName(String versionedCollectionNameString) throws IllegalArgumentException { - return parseVersionedName(versionedCollectionNameString).getCollectionName(); - } - - public static boolean collectionNameHasVersion(String collectionNameString) { - try { - VersionedCollectionName parsedName = parseVersionedName(collectionNameString); - return !StringUtils.isNullOrEmpty(parsedName.getCollectionVersion()); - } - catch (Exception e) { - return false; - } - } - - public static String buildVersionedNameString(final String baseName, final String collectionVersion, - final boolean validateBaseName) throws IllegalArgumentException { - Preconditions.checkNotNull(baseName, "base name is required"); - if (validateBaseName && baseName.contains(VERSIONED_NAME_SEPARATOR)) { - throw new IllegalArgumentException("Cannot build versioned name using a base name that already includes the version separator"); - } - if (collectionVersion == null || collectionVersion == "") { - return baseName; - } - return baseName + VERSIONED_NAME_SEPARATOR + collectionVersion; - } - - public static VersionedCollectionName createVersionedName(String baseName, String collectionVersion) { - return new VersionedCollectionNameImpl(baseName, collectionVersion); - } - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtils.java new file mode 100644 index 0000000..241a8ad --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import com.google.common.base.Preconditions; +import org.apache.usergrid.utils.StringUtils; + +import java.util.regex.Pattern; + +import static org.apache.usergrid.persistence.model.util.CollectionUtils.VERSIONED_NAME_SEPARATOR; + +public class CollectionVersionUtils { + + public static VersionedCollectionName parseVersionedName(String versionedCollectionNameString) throws IllegalArgumentException { + Preconditions.checkNotNull(versionedCollectionNameString, "collection name string is required"); + String collectionName; + String collectionVersion; + try { + String[] parts = versionedCollectionNameString.split(Pattern.quote(VERSIONED_NAME_SEPARATOR)); + if (parts.length == 2) { + collectionName = parts[0]; + collectionVersion = parts[1]; + } else if (parts.length == 1) { + collectionName = parts[0]; + collectionVersion = ""; + } else { + throw new IllegalArgumentException("Invalid format for versioned collection, versionedCollectionNameString=" + versionedCollectionNameString); + } + } catch (Exception e) { + throw new IllegalArgumentException("Unable to parse versioned collection, versionedCollectionNameString=" + versionedCollectionNameString, e); + } + return new VersionedCollectionNameImpl(collectionName, collectionVersion); + } + + public static String getBaseCollectionName(String versionedCollectionNameString) throws IllegalArgumentException { + return parseVersionedName(versionedCollectionNameString).getCollectionName(); + } + + public static String getCollectionVersion(String versionedCollectionNameString) throws IllegalArgumentException { + return parseVersionedName(versionedCollectionNameString).getCollectionVersion(); + } + + public static boolean collectionNameHasVersion(String collectionNameString) { + try { + VersionedCollectionName parsedName = parseVersionedName(collectionNameString); + return !StringUtils.isNullOrEmpty(parsedName.getCollectionVersion()); + } + catch (Exception e) { + return false; + } + } + + public static boolean isVersionedName(String name) { + try { + return name.contains(VERSIONED_NAME_SEPARATOR); + } + catch (Exception e) { + return false; + } + } + + public static String buildVersionedNameString(final String baseName, final String collectionVersion, + final boolean validateBaseName) throws IllegalArgumentException { + return buildVersionedNameString(baseName, collectionVersion, validateBaseName, false); + } + + public static String buildVersionedNameString(final String baseName, final String collectionVersion, + final boolean validateBaseName, final boolean forceVersion) { + Preconditions.checkNotNull(baseName, "base name is required"); + if (validateBaseName && baseName.contains(VERSIONED_NAME_SEPARATOR)) { + throw new IllegalArgumentException("Cannot build versioned name using a base name that already includes the version separator"); + } + if (!forceVersion && collectionVersion == null || collectionVersion == "") { + return baseName; + } + return baseName + VERSIONED_NAME_SEPARATOR + collectionVersion; + + } + + public static VersionedCollectionName createVersionedName(final String baseName, final String collectionVersion) { + return new VersionedCollectionNameImpl(baseName, collectionVersion); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index c8dfc31..cc99a98 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -254,6 +254,9 @@ public class IndexServiceImpl implements IndexService { public Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge, final Id entityId, final UUID entityVersion){ + if (logger.isTraceEnabled()) { + logger.trace("deIndexEdge edge={} entityId={} entityVersion={}", edge.toString(), entityId.toString(), entityVersion.toString()); + } final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope)); final EntityIndexBatch entityBatch = ei.createBatch(); entityBatch.deindex(generateScopeFromSource( edge ), entityId, entityVersion); http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java index 7770436..2da67f7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java @@ -25,6 +25,7 @@ import java.util.*; import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.persistence.index.*; import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.field.DistanceField; import org.apache.usergrid.persistence.model.field.DoubleField; import org.apache.usergrid.persistence.model.field.EntityObjectField; @@ -112,7 +113,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate candidates.flatMap(candidatesList -> { Collection<SelectFieldMapping> mappings = candidatesList.get(0).getFields(); Observable<EntitySet> entitySets = Observable.from(candidatesList) - .map(candidateEntry -> candidateEntry.getCandidateResult().getId()).toList() + .map(candidateEntry -> (Id)new SimpleId(candidateEntry.getCandidateResult().getId(), false)).toList() .flatMap(idList -> entityCollectionManager.load(idList)); //now we have a collection, validate our canidate set is correct. return entitySets.map( @@ -273,7 +274,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate final CandidateResult candidateResult = candidate.getCandidateResult(); final boolean isGeo = candidateResult instanceof GeoCandidateResult; final SearchEdge searchEdge = candidate.getSearchEdge(); - final Id candidateId = candidateResult.getId(); + final Id candidateId = new SimpleId(candidateResult.getId(), false); final UUID candidateVersion = candidateResult.getVersion(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java index 0a4360f..2bea0cb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java @@ -40,40 +40,6 @@ public class Application extends TypedEntity implements Serializable { public static final String ENTITY_TYPE = "application"; - public static final String COLLECTION_USERS = "users"; - - public static final String COLLECTION_GROUPS = "groups"; - - public static final String COLLECTION_ASSETS = "assets"; - - public static final String COLLECTION_ACTIVITIES = "activities"; - - public static final String COLLECTION_EVENTS = "events"; - - public static final String COLLECTION_FOLDERS = "folders"; - - public static final String COLLECTION_DEVICES = "devices"; - - public static final String COLLECTION_NOTIFICATIONS = "notifications"; - - public static final String COLLECTION_ROLES = "roles"; - - public static boolean isCustomCollectionName(String collectionName) { - switch (collectionName.toLowerCase()) { - case COLLECTION_USERS: - case COLLECTION_GROUPS: - case COLLECTION_ASSETS: - case COLLECTION_ACTIVITIES: - case COLLECTION_EVENTS: - case COLLECTION_FOLDERS: - case COLLECTION_DEVICES: - case COLLECTION_NOTIFICATIONS: - case COLLECTION_ROLES: - return false; - } - return true; - } - @EntityProperty(indexed = true, fulltextIndexed = false, required = true, mutable = false, aliasProperty = true, basic = true) protected String name; http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/utils/InflectionUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/utils/InflectionUtils.java b/stack/core/src/main/java/org/apache/usergrid/utils/InflectionUtils.java index dde2f4f..6929e8d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/utils/InflectionUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/utils/InflectionUtils.java @@ -17,7 +17,7 @@ package org.apache.usergrid.utils; -import org.apache.usergrid.corepersistence.index.CollectionVersionUtil; +import org.apache.usergrid.corepersistence.index.CollectionVersionUtils; import org.apache.usergrid.corepersistence.index.VersionedCollectionName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,17 +29,17 @@ public class InflectionUtils { private static VersionedCollectionName parseName(Object word) { String name = word.toString().trim(); try { - return CollectionVersionUtil.parseVersionedName(name); + return CollectionVersionUtils.parseVersionedName(name); } catch (Exception e) { logger.error("parseName(): failed to parse the versioned name: {}", name); - return CollectionVersionUtil.createVersionedName(name, ""); + return CollectionVersionUtils.createVersionedName(name, ""); } } private static String getVersionedName(String name, String version) { try { - return CollectionVersionUtil.buildVersionedNameString(name, version, true); + return CollectionVersionUtils.buildVersionedNameString(name, version, true); } catch (Exception e) { // if versioned invalid, return name http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/main/java/org/apache/usergrid/utils/StringUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/utils/StringUtils.java b/stack/core/src/main/java/org/apache/usergrid/utils/StringUtils.java index 6bb44d8..4eff293 100644 --- a/stack/core/src/main/java/org/apache/usergrid/utils/StringUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/utils/StringUtils.java @@ -160,6 +160,10 @@ public class StringUtils extends org.apache.commons.lang.StringUtils { return obj instanceof String; } + public static Boolean isNullOrEmpty(String s) { + return (s == null || s.equals("")); + } + public static String readClasspathFileAsString( String filePath ) { try { http://git-wip-us.apache.org/repos/asf/usergrid/blob/24e443b2/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java index c9bba02..9d4aca6 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java @@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.EsRunner; import org.apache.usergrid.persistence.index.impl.IndexProducer; @@ -89,11 +90,17 @@ public class AsyncEventServiceImplTest extends AsyncIndexServiceTest { public CollectionVersionFig collectionVersionFig; @Inject + public CollectionVersionManagerFactory collectionVersionManagerFactory; + + @Inject + public GraphManagerFactory graphManagerFactory; + + @Inject public AllEntityIdsObservable allEntityIdsObservable; @Override protected AsyncEventService getAsyncEventService() { - return new AsyncEventServiceImpl( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, collectionVersionFig, allEntityIdsObservable, rxTaskScheduler ); + return new AsyncEventServiceImpl( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, collectionVersionFig, collectionVersionManagerFactory, graphManagerFactory, allEntityIdsObservable, rxTaskScheduler ); }
