delete collections via API, currently uses utility queue
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7d3eb647 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7d3eb647 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7d3eb647 Branch: refs/heads/collectionDelete Commit: 7d3eb647d86d56bc3ec1c780ee65d348ca28f0fd Parents: a6fee78 Author: Mike Dunker <[email protected]> Authored: Fri Jul 7 08:03:50 2017 -0700 Committer: Mike Dunker <[email protected]> Committed: Fri Jul 7 08:03:50 2017 -0700 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 6 + .../corepersistence/CpEntityManager.java | 46 ++++++-- .../corepersistence/CpEntityManagerFactory.java | 12 +- .../asyncevents/AsyncEventService.java | 8 ++ .../asyncevents/AsyncEventServiceImpl.java | 113 +++++++++++++++++-- .../asyncevents/AsyncIndexProvider.java | 12 +- .../CollectionDeleteTooSoonException.java | 39 +++++++ .../asyncevents/EventBuilder.java | 9 ++ .../asyncevents/EventBuilderImpl.java | 51 +++++++-- .../asyncevents/model/AsyncEvent.java | 3 +- .../model/CollectionDeleteEvent.java | 58 ++++++++++ .../asyncevents/model/EntityDeleteEvent.java | 12 ++ .../index/CollectionDeleteService.java | 30 +++++ .../index/CollectionDeleteServiceImpl.java | 57 ++++++++++ .../corepersistence/index/CollectionScope.java | 29 +++++ .../index/CollectionScopeImpl.java | 92 +++++++++++++++ .../index/CollectionSettingsImpl.java | 19 +++- .../index/CollectionVersionCache.java | 57 ++++++++++ .../index/CollectionVersionFig.java | 53 +++++++++ .../index/CollectionVersionManager.java | 36 ++++++ .../index/CollectionVersionManagerFactory.java | 65 +++++++++++ .../index/CollectionVersionManagerImpl.java | 111 ++++++++++++++++++ .../index/CollectionVersionUtil.java | 80 +++++++++++++ .../corepersistence/index/IndexServiceImpl.java | 6 +- .../index/VersionedCollectionName.java | 29 +++++ .../index/VersionedCollectionNameImpl.java | 80 +++++++++++++ .../rx/impl/AllEntityIdsObservableImpl.java | 5 +- .../corepersistence/util/CpNamingUtils.java | 15 ++- .../usergrid/persistence/EntityManager.java | 4 +- .../apache/usergrid/persistence/Results.java | 25 +++- .../persistence/entities/Application.java | 26 +++++ .../apache/usergrid/utils/InflectionUtils.java | 42 ++++++- .../corepersistence/AggregationServiceTest.java | 7 ++ .../index/AsyncEventServiceImplTest.java | 9 +- .../index/CollectionVersionTest.java | 23 ++++ .../usergrid/persistence/RebuildIndexTest.java | 5 +- .../rest/applications/CollectionResource.java | 58 ++++++++++ .../rest/applications/ServiceResource.java | 15 +++ .../CollectionDeleteTooSoonExceptionMapper.java | 44 ++++++++ .../apache/usergrid/services/ServiceInfo.java | 17 +++ .../usergrid/services/ServiceManager.java | 49 +++++++- .../services/ServiceManagerFactory.java | 6 +- .../usergrid/services/ActivitiesServiceIT.java | 2 + .../usergrid/services/CollectionServiceIT.java | 6 +- 44 files changed, 1410 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index ec6b775..5515abd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -104,6 +104,8 @@ public class CoreModule extends AbstractModule { bind( ApplicationIdCacheFactory.class ); bind( CollectionSettingsFactory.class ); bind( CollectionSettingsCache.class ); + bind( CollectionVersionManagerFactory.class ); + bind( CollectionVersionCache.class ); /** @@ -141,6 +143,8 @@ public class CoreModule extends AbstractModule { bind( ReIndexService.class ).to( ReIndexServiceImpl.class ); + bind( CollectionDeleteService.class ).to( CollectionDeleteServiceImpl.class ); + bind( ExportService.class ).to( ExportServiceImpl.class ); install( new FactoryModuleBuilder().implement( AggregationService.class, AggregationServiceImpl.class ) @@ -157,6 +161,8 @@ public class CoreModule extends AbstractModule { install( new GuicyFigModule( CollectionSettingsCacheFig.class ) ); + install( new GuicyFigModule( CollectionVersionFig.class ) ); + install( new GuicyFigModule( EntityManagerFig.class ) ); install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index cdb4fc7..ad5220b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -29,9 +29,7 @@ import me.prettyprint.hector.api.query.QueryResult; import me.prettyprint.hector.api.query.SliceCounterQuery; import org.apache.commons.lang.NullArgumentException; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.index.CollectionSettings; -import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; -import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl; +import org.apache.usergrid.corepersistence.index.*; import org.apache.usergrid.corepersistence.service.CollectionService; import org.apache.usergrid.corepersistence.service.ConnectionService; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; @@ -160,6 +158,8 @@ public class CpEntityManager implements EntityManager { private EntityCollectionManager ecm; public QueueManagerFactory queueManagerFactory; + private CollectionDeleteService collectionDeleteService; + private CollectionVersionManagerFactory collectionVersionManagerFactory; // /** Short-term cache to keep us from reloading same Entity during single request. */ @@ -186,7 +186,9 @@ public class CpEntityManager implements EntityManager { final ConnectionService connectionService, final CollectionSettingsFactory collectionSettingsFactory, final UUID applicationId, - final QueueManagerFactory queueManagerFactory) { + final QueueManagerFactory queueManagerFactory, + final CollectionDeleteService collectionDeleteService, + final CollectionVersionManagerFactory collectionVersionManagerFactory) { this.entityManagerFig = entityManagerFig; this.actorSystemFig = actorSystemFig; @@ -253,6 +255,8 @@ public class CpEntityManager implements EntityManager { this.skipAggregateCounters = false; this.queueManagerFactory = queueManagerFactory; + this.collectionDeleteService = collectionDeleteService; + this.collectionVersionManagerFactory = collectionVersionManagerFactory; } @@ -735,7 +739,22 @@ public class CpEntityManager implements EntityManager { @Override public Set<String> getApplicationCollections() throws Exception { - Set<String> existingCollections = getRelationManager( getApplication() ).getCollections(); + Set<String> existingCollections = new HashSet<>(); + for (String existingCollection : getRelationManager( getApplication() ).getCollections()) { + if (Application.isCustomCollectionName(existingCollection)) { + // check for correct version + VersionedCollectionName v = CollectionVersionUtil.parseVersionedName(existingCollection); + CollectionVersionManager cvm = collectionVersionManagerFactory.getInstance( + new CollectionScopeImpl(getApplication().asId(), v.getCollectionName()) + ); + String currentVersion = cvm.getCollectionVersion(true); + if (!v.getCollectionVersion().equals(currentVersion)) { + // not the right version, skip it + continue; + } + existingCollections.add(existingCollection); + } + } Set<String> system_collections = Schema.getDefaultSchema().getCollectionNames( Application.ENTITY_TYPE ); if ( system_collections != null ) { @@ -765,12 +784,13 @@ public class CpEntityManager implements EntityManager { if ( !Schema.isAssociatedEntityType( collectionName ) ) { Long count = counts.get( APPLICATION_COLLECTION + collectionName ); + String unversionedCollectionName = CollectionVersionUtil.getBaseCollectionName(collectionName); Map<String, Object> entry = new HashMap<String, Object>(); entry.put( "count", count != null ? count : 0 ); - entry.put( "type", singularize( collectionName ) ); - entry.put( "name", collectionName ); - entry.put( "title", capitalize( collectionName ) ); - metadata.put( collectionName, entry ); + entry.put( "type", singularize( unversionedCollectionName ) ); + entry.put( "name", unversionedCollectionName ); + entry.put( "title", capitalize( unversionedCollectionName ) ); + metadata.put( unversionedCollectionName, entry ); } } } @@ -1870,6 +1890,13 @@ public class CpEntityManager implements EntityManager { } @Override + public void deleteCollection( String collectionName ){ + + collectionDeleteService.deleteCollection(applicationId, collectionName); + + } + + @Override public void grantRolePermission( String roleName, String permission ) throws Exception { roleName = roleName.toLowerCase(); permission = permission.toLowerCase(); @@ -2471,7 +2498,6 @@ public class CpEntityManager implements EntityManager { final Entity entity; - //this is the fall back, why isn't this writt if ( entityType == null ) { return null; // throw new EntityNotFoundException( String.format( "Counld not find type for uuid {}", uuid ) ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index cec7258..b3dac57 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -26,9 +26,7 @@ import com.google.inject.Key; import com.google.inject.TypeLiteral; import org.apache.commons.lang.StringUtils; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; -import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder; -import org.apache.usergrid.corepersistence.index.ReIndexService; +import org.apache.usergrid.corepersistence.index.*; import org.apache.usergrid.corepersistence.service.CollectionService; import org.apache.usergrid.corepersistence.service.ConnectionService; import org.apache.usergrid.corepersistence.util.CpNamingUtils; @@ -119,6 +117,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private final CollectionSettingsFactory collectionSettingsFactory; private ActorSystemManager actorSystemManager; private final LockManager lockManager; + private final CollectionDeleteService collectionDeleteService; + private final CollectionVersionManagerFactory collectionVersionManagerFactory; private final QueueManagerFactory queueManagerFactory; @@ -143,6 +143,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.collectionService = injector.getInstance( CollectionService.class ); this.connectionService = injector.getInstance( ConnectionService.class ); this.collectionSettingsFactory = injector.getInstance( CollectionSettingsFactory.class ); + this.collectionDeleteService = injector.getInstance( CollectionDeleteService.class ); + this.collectionVersionManagerFactory = injector.getInstance( CollectionVersionManagerFactory.class ); Properties properties = cassandraService.getProperties(); this.entityManagers = createEntityManagerCache( properties ); @@ -392,7 +394,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application connectionService, collectionSettingsFactory, applicationId, - queueManagerFactory); + queueManagerFactory, + collectionDeleteService, + collectionVersionManagerFactory); return em; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index cab4e3e..5fe4295 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.asyncevents; +import org.apache.usergrid.corepersistence.index.CollectionScope; import org.apache.usergrid.corepersistence.index.ReIndexAction; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; @@ -96,6 +97,13 @@ public interface AsyncEventService extends ReIndexAction { void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId, UUID markedVersion); /** + * The version of a collection has been changed, queue cleanup of old version + * @param collectionScope + * @param collectionVersion + */ + void queueCollectionDelete(final CollectionScope collectionScope, final String collectionVersion); + + /** * current queue depth * @return */ http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 530cf7d..5628a11 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -30,10 +30,8 @@ import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.usergrid.corepersistence.asyncevents.model.*; -import org.apache.usergrid.corepersistence.index.EntityIndexOperation; -import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; -import org.apache.usergrid.corepersistence.index.IndexProcessorFig; -import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy; +import org.apache.usergrid.corepersistence.index.*; +import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer; @@ -70,11 +68,11 @@ import java.io.IOException; import java.io.Serializable; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.commons.lang.StringUtils.indexOf; import static org.apache.commons.lang.StringUtils.isNotEmpty; @@ -113,12 +111,14 @@ public class AsyncEventServiceImpl implements AsyncEventService { private final LegacyQueueManager utilityQueueDead; private final IndexProcessorFig indexProcessorFig; private final LegacyQueueFig queueFig; + private final CollectionVersionFig collectionVersionFig; private final IndexProducer indexProducer; private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final IndexLocationStrategyFactory indexLocationStrategyFactory; private final EntityIndexFactory entityIndexFactory; private final EventBuilder eventBuilder; private final RxTaskScheduler rxTaskScheduler; + private final AllEntityIdsObservable allEntityIdsObservable; private final Timer readTimer; private final Timer writeTimer; @@ -153,6 +153,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { final EventBuilder eventBuilder, final MapManagerFactory mapManagerFactory, final LegacyQueueFig queueFig, + final CollectionVersionFig collectionVersionFig, + final AllEntityIdsObservable allEntityIdsObservable, @EventExecutionScheduler final RxTaskScheduler rxTaskScheduler ) { this.indexProducer = indexProducer; @@ -187,6 +189,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { this.indexProcessorFig = indexProcessorFig; this.queueFig = queueFig; + this.collectionVersionFig = collectionVersionFig; + this.allEntityIdsObservable = allEntityIdsObservable; this.writeTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.write"); this.readTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.read"); @@ -211,16 +215,25 @@ public class AsyncEventServiceImpl implements AsyncEventService { * Offer the EntityIdScope to SQS */ private void offer(final Serializable operation) { + offer(operation, false); + } + + private void offer(final Serializable operation, boolean forUtilityQueue) { final Timer.Context timer = this.writeTimer.time(); try { //signal to SQS - this.indexQueue.sendMessageToLocalRegion( operation ); + if (forUtilityQueue) { + this.indexQueue.sendMessageToLocalRegion(operation); + } else { + this.indexQueue.sendMessageToLocalRegion(operation); + } } catch (IOException e) { throw new RuntimeException("Unable to queue message", e); } finally { timer.stop(); } + } @@ -479,6 +492,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event); + } else if (event instanceof CollectionDeleteEvent) { + + handleCollectionDelete(message); + } else { throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim()); @@ -487,6 +504,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { if( !(event instanceof ElasticsearchIndexEvent) && !(event instanceof InitializeApplicationIndexEvent) + && !(event instanceof CollectionDeleteEvent) && single.isEmpty() ){ logger.warn("No index operation messages came back from event processing for eventType: {}, msgId: {}, msgBody: {}", event.getClass().getSimpleName(), message.getMessageId(), message.getStringBody()); @@ -821,7 +839,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { @Override public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { - logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType()); + if (logger.isDebugEnabled()) { + logger.debug("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType()); + } // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); @@ -840,12 +860,15 @@ public class AsyncEventServiceImpl implements AsyncEventService { final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event; final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope(); final Id entityId = entityDeleteEvent.getEntityIdScope().getId(); + final boolean markedOnly = entityDeleteEvent.markedOnly(); - if (logger.isDebugEnabled()) + if (logger.isDebugEnabled()) { logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); + } - final EventBuilderImpl.EntityDeleteResults - entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId ); + final EventBuilderImpl.EntityDeleteResults entityDeleteResults = markedOnly ? + eventBuilder.buildEntityDelete( applicationScope, entityId ) : + eventBuilder.buildEntityDeleteAllVersions( applicationScope, entityId ); // Delete the entities and remove from graph separately @@ -858,6 +881,76 @@ public class AsyncEventServiceImpl implements AsyncEventService { } + @Override + public void queueCollectionDelete(final CollectionScope collectionScope, final String collectionVersion) { + + if (logger.isDebugEnabled()) { + logger.debug("Offering CollectionDeleteEvent for application={}, collectionName={}, collectionVersion={}", + collectionScope.getApplication().getUuid(), collectionScope.getCollectionName(), collectionVersion); + } + + // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op + offer(new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), true); + } + + private void handleCollectionDelete(final LegacyQueueMessage message) { + + Preconditions.checkNotNull(message, "Queue Message cannot be null for handleCollectionDelete"); + + final AsyncEvent event = (AsyncEvent) message.getBody(); + Preconditions.checkNotNull(event, "QueueMessage body cannot be null for handleCollectionDelete" ); + Preconditions.checkArgument( event instanceof CollectionDeleteEvent, + String.format( "Event Type for handleCollectionDelete must be COLLECTION_DELETE, got %s", event.getClass() ) ); + + final CollectionDeleteEvent collectionDeleteEvent = ( CollectionDeleteEvent ) event; + final CollectionScope collectionScope = collectionDeleteEvent.getCollectionScope(); + if (collectionScope == null) { + logger.error("CollectionDeleteEvent received with null collectionScope"); + // ack message, nothing more to do + return; + } + final UUID applicationID = collectionScope.getApplication().getUuid(); + if (applicationID == null) { + logger.error("CollectionDeleteEvent collectionScope has null application"); + // ack message, nothing more to do + return; + } + String collectionVersion = collectionDeleteEvent.getCollectionVersion(); + if (collectionVersion == null) { + collectionVersion = ""; + } + final ApplicationScope applicationScope = CpNamingUtils.getApplicationScope(applicationID); + final String versionedCollectionName = + CollectionVersionUtil.buildVersionedNameString(collectionScope.getCollectionName(), + collectionVersion, false); + + + final AtomicInteger count = new AtomicInteger(); + int maxDeletes = collectionVersionFig.getDeletesPerEvent(); + + if (logger.isDebugEnabled()) { + logger.debug("handleCollectionDelete: applicationScope={} collectionName={} maxDeletes={}", applicationScope.toString(), versionedCollectionName, maxDeletes); + } + allEntityIdsObservable.getEdgesToEntities(Observable.just(applicationScope), + Optional.fromNullable(CpNamingUtils.getEdgeTypeFromCollectionName(versionedCollectionName.toLowerCase())), Optional.absent()) + //.takeWhile(edgeScope-> count.intValue() < maxDeletes) + .take(maxDeletes) + .doOnNext(edgeScope-> { + + offer(new EntityDeleteEvent(queueFig.getPrimaryRegion(), + new EntityIdScope(applicationScope, edgeScope.getEdge().getTargetNode()),false), + true); + count.incrementAndGet(); + }).toBlocking().lastOrDefault(null); + + logger.info("handleCollectionDelete: queued {} entity deletes for deleted collection", count.intValue()); + + if (count.intValue() >= maxDeletes) { + // requeue collection delete for next chunk of deletes + offer (new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), true); + } + } + private void handleInitializeApplicationIndex(final AsyncEvent event, final LegacyQueueMessage message) { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex"); http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 2ba6c0b..31fcd6d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -20,8 +20,10 @@ package org.apache.usergrid.corepersistence.asyncevents; +import org.apache.usergrid.corepersistence.index.CollectionVersionFig; import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; +import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; @@ -58,6 +60,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { private final IndexProducer indexProducer; private final MapManagerFactory mapManagerFactory; private final LegacyQueueFig queueFig; + private final CollectionVersionFig collectionVersionFig; + private final AllEntityIdsObservable allEntityIdsObservable; private AsyncEventService asyncEventService; @@ -73,7 +77,9 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer, final MapManagerFactory mapManagerFactory, - final LegacyQueueFig queueFig) { + final LegacyQueueFig queueFig, + final CollectionVersionFig collectionVersionFig, + final AllEntityIdsObservable allEntityIdsObservable) { this.indexProcessorFig = indexProcessorFig; this.queueManagerFactory = queueManagerFactory; @@ -86,6 +92,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { this.indexProducer = indexProducer; this.mapManagerFactory = mapManagerFactory; this.queueFig = queueFig; + this.collectionVersionFig = collectionVersionFig; + this.allEntityIdsObservable = allEntityIdsObservable; } @@ -116,6 +124,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { eventBuilder, mapManagerFactory, queueFig, + collectionVersionFig, + allEntityIdsObservable, rxTaskScheduler ); if ( impl.equals( LOCAL )) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java new file mode 100644 index 0000000..bd37d46 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + +public class CollectionDeleteTooSoonException extends RuntimeException { + + private final long timeLastDeleted; + private final long timeRequiredBeforeDeleteMsec; + + public CollectionDeleteTooSoonException(final long timeLastDeleted, final long timeRequiredBeforeDeleteMsec) { + this.timeLastDeleted = timeLastDeleted; + this.timeRequiredBeforeDeleteMsec = timeRequiredBeforeDeleteMsec; + } + + public long getTimeLastDeleted() { + return timeLastDeleted; + } + + public long getTimeRequiredBeforeDeleteMsec() { + return timeRequiredBeforeDeleteMsec; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java index 4db9f4b..8618c73 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -64,6 +64,15 @@ public interface EventBuilder { */ EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId ); + /** + * Return a bin with 2 observable streams for entity delete. This deletes all versions -- used only for an old + * collection version. Does not require versions to be marked for deletion. + * @param applicationScope + * @param entityId + * @return + */ + EntityDeleteResults buildEntityDeleteAllVersions(ApplicationScope applicationScope, Id entityId ); + /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index bbdce5a..33d384e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -105,10 +105,10 @@ public class EventBuilderImpl implements EventBuilder { //Does the queue entityDelete mark the entity then immediately does to the deleteEntityIndex. seems like //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter? - @Override - public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) { + private EntityDeleteResults buildEntityDeleteCommon(final ApplicationScope applicationScope, final Id entityId, boolean markedOnly) { if (logger.isDebugEnabled()) { - logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); + logger.debug("Deleting entity id ({} versions) from index in app scope {} with entityId {}", + markedOnly ? "marked" : "all", applicationScope, entityId); } final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); @@ -116,25 +116,30 @@ public class EventBuilderImpl implements EventBuilder { //TODO USERGRID-1123: Implement so we don't iterate logs twice (latest DELETED version, then to get all DELETED) - MvccLogEntry mostRecentlyMarked = ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking() - .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED ); + MvccLogEntry mostRecentToDelete = markedOnly ? + ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking() + .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED ) : + ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking() + .firstOrDefault( null ); // De-indexing and entity deletes don't check log entries. We must do that first. If no DELETED logs, then // return an empty observable as our no-op. Observable<IndexOperationMessage> deIndexObservable = Observable.empty(); Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty(); - if(mostRecentlyMarked != null){ + if(mostRecentToDelete != null || !markedOnly){ // fetch entity versions to be de-index by looking in cassandra - deIndexObservable = - indexService.deIndexEntity(applicationScope, entityId, mostRecentlyMarked.getVersion(), - getVersionsOlderThanMarked(ecm, entityId, mostRecentlyMarked.getVersion())); + deIndexObservable = markedOnly ? + indexService.deIndexEntity(applicationScope, entityId, mostRecentToDelete.getVersion(), + getVersionsOlderThanMarked(ecm, entityId, mostRecentToDelete.getVersion())) : + indexService.deIndexEntity(applicationScope, entityId, UUIDUtils.newTimeUUID(), + getAllVersions(ecm, entityId)); ecmDeleteObservable = - ecm.getVersionsFromMaxToMin( entityId, mostRecentlyMarked.getVersion() ) + ecm.getVersionsFromMaxToMin( entityId, mostRecentToDelete.getVersion() ) .filter( mvccLogEntry-> - mvccLogEntry.getVersion().timestamp() <= mostRecentlyMarked.getVersion().timestamp() ) + mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() ) .buffer( serializationFig.getBufferSize() ) .doOnNext( buffer -> ecm.delete( buffer ) ); } @@ -146,6 +151,17 @@ public class EventBuilderImpl implements EventBuilder { } @Override + public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) { + return buildEntityDeleteCommon(applicationScope, entityId, true); + } + + // this deletes all versions of an entity, only used for collection delete + @Override + public EntityDeleteResults buildEntityDeleteAllVersions(final ApplicationScope applicationScope, final Id entityId ) { + return buildEntityDeleteCommon(applicationScope, entityId, false); + } + + @Override public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) { final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope(); @@ -210,4 +226,17 @@ public class EventBuilderImpl implements EventBuilder { return versions; } + private List<UUID> getAllVersions( final EntityCollectionManager ecm, + final Id entityId ) { + + final List<UUID> versions = new ArrayList<>(); + + ecm.getVersionsFromMaxToMin(entityId, UUIDUtils.newTimeUUID()) + .forEach( mvccLogEntry -> { + versions.add(mvccLogEntry.getVersion()); + }); + + return versions; + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java index bd581ad..0ea0fdc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java @@ -44,7 +44,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ), @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ), @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" ), - @JsonSubTypes.Type( value = DeIndexOldVersionsEvent.class, name = "deIndexOldVersionsEvent" ) + @JsonSubTypes.Type( value = DeIndexOldVersionsEvent.class, name = "deIndexOldVersionsEvent" ), + @JsonSubTypes.Type( value = CollectionDeleteEvent.class, name = "collectionDeleteEvent" ) } ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java new file mode 100644 index 0000000..9fc978c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.usergrid.corepersistence.index.CollectionScope; + +/** + * Event that will signal to queue up entity deletes for a collection delete. + */ +public final class CollectionDeleteEvent extends AsyncEvent { + + + @JsonProperty + protected CollectionScope collectionScope; + + @JsonProperty + protected String collectionVersion; + + /** + * Do not delete! Needed for Jackson + */ + @SuppressWarnings( "unused" ) + public CollectionDeleteEvent() { + super(); + } + + public CollectionDeleteEvent(String sourceRegion, CollectionScope collectionScope, String collectionVersion) { + super(sourceRegion); + this.collectionScope = collectionScope; + this.collectionVersion = collectionVersion; + } + + public CollectionScope getCollectionScope() { + return collectionScope; + } + + public String getCollectionVersion() { + return collectionVersion; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java index 01d2ba8..aa6a15b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java @@ -31,17 +31,29 @@ public final class EntityDeleteEvent extends AsyncEvent { @JsonProperty protected EntityIdScope entityIdScope; + @JsonProperty + protected boolean markedOnly; + public EntityDeleteEvent() { super(); } public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope) { + this(sourceRegion, entityIdScope, true); + } + + public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope, boolean markedOnly) { super(sourceRegion); this.entityIdScope = entityIdScope; + this.markedOnly = markedOnly; } public EntityIdScope getEntityIdScope() { return entityIdScope; } + + public boolean markedOnly() { + return markedOnly; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java new file mode 100644 index 0000000..85b8fed --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + +import java.util.UUID; + +public interface CollectionDeleteService { + + /** + * Delete the current version of a collection by changing the collection version and queueing up a delete of the old entities + */ + void deleteCollection(final UUID applicationID, final String baseCollectionName); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java new file mode 100644 index 0000000..5c64079 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + +import com.google.inject.Inject; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + +public class CollectionDeleteServiceImpl implements CollectionDeleteService { + private static final Logger logger = LoggerFactory.getLogger(CollectionDeleteServiceImpl.class ); + + private CollectionVersionManagerFactory collectionVersionManagerFactory; + private AsyncEventService asyncEventService; + + @Inject + public CollectionDeleteServiceImpl( + final CollectionVersionManagerFactory collectionVersionManagerFactory, + final AsyncEventService asyncEventService + ) + { + this.collectionVersionManagerFactory = collectionVersionManagerFactory; + this.asyncEventService = asyncEventService; + } + + @Override + public void deleteCollection(final UUID applicationID, final String baseCollectionName) { + CollectionScope scope = new CollectionScopeImpl(applicationID, baseCollectionName); + CollectionVersionManager collectionVersionManager = collectionVersionManagerFactory.getInstance(scope); + + // change version + String oldVersion = collectionVersionManager.updateCollectionVersion(); + + // queue up delete of old version entities + asyncEventService.queueCollectionDelete(scope, oldVersion); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScope.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScope.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScope.java new file mode 100644 index 0000000..9ec3ad9 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScope.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + + +@JsonDeserialize(as = CollectionScopeImpl.class) +public interface CollectionScope extends ApplicationScope { + + String getCollectionName(); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScopeImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScopeImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScopeImpl.java new file mode 100644 index 0000000..6c29ee5 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScopeImpl.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; + +import java.util.UUID; + +import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; + + +public class CollectionScopeImpl implements CollectionScope { + + protected Id application; + protected String collectionName; + + + /** + * Do not delete! Needed for Jackson + */ + @SuppressWarnings( "unused" ) + public CollectionScopeImpl(){ + + } + + public CollectionScopeImpl(final Id application, final String collectionName ) { + this.application = application; + this.collectionName = collectionName; + } + + public CollectionScopeImpl(final UUID applicationID, final String collectionName) { + this(new SimpleId(applicationID, TYPE_APPLICATION), collectionName); + } + + @Override + public String getCollectionName() { + return collectionName; + } + + @Override + public Id getApplication() { + return application; + } + + @Override + public boolean equals( final Object o ) { + if ( this == o ) { + return true; + } + if ( !( o instanceof CollectionScopeImpl) ) { + return false; + } + + final CollectionScopeImpl collectionVersionScope = (CollectionScopeImpl) o; + + if ( !application.equals( collectionVersionScope.application) ) { + return false; + } + + if ( !collectionName.equals( collectionVersionScope.collectionName ) ) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(application) + .append(collectionName) + .toHashCode(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java index 921777a..74acd09 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java @@ -20,7 +20,6 @@ package org.apache.usergrid.corepersistence.index; import com.google.common.base.Optional; import com.google.inject.Inject; -import com.google.inject.Singleton; import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.utils.JsonUtils; import org.slf4j.Logger; @@ -51,13 +50,15 @@ public class CollectionSettingsImpl implements CollectionSettings { @Override public Optional<Map<String, Object>> getCollectionSettings(final String collectionName ) { + // collectionName may be a versioned collection name -- get the base name + String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName(); + String settings; settings = cache.get(scope); if( settings == null ) { - settings = mapManager.getString(collectionName); - + settings = mapManager.getString(baseCollectionName); } if (settings != null) { @@ -77,14 +78,22 @@ public class CollectionSettingsImpl implements CollectionSettings { @Override public void putCollectionSettings(final String collectionName, final String collectionSchema ){ - mapManager.putString( collectionName, collectionSchema ); + + // collectionName may be a versioned collection name -- get the base name + String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName(); + + mapManager.putString( baseCollectionName, collectionSchema ); cache.put(scope, collectionSchema); } @Override public void deleteCollectionSettings(final String collectionName){ - mapManager.delete( collectionName ); + + // collectionName may be a versioned collection name -- get the base name + String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName(); + + mapManager.delete( baseCollectionName ); cache.invalidate( scope ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionCache.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionCache.java new file mode 100644 index 0000000..e4e8e93 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionCache.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import java.util.concurrent.TimeUnit; + +@Singleton +public class CollectionVersionCache { + + private final Cache<CollectionScope,String> cache; + + + @Inject + public CollectionVersionCache(CollectionVersionFig fig ) { + this.cache = CacheBuilder.newBuilder() + .maximumSize(fig.getCacheSize()) + .expireAfterWrite(fig.getCacheTimeout(), TimeUnit.SECONDS).build(); + } + + + public void put(CollectionScope key, String value){ + cache.put(key, value); + } + + public String get(CollectionScope key){ + return cache.getIfPresent(key); + } + + public void invalidate(CollectionScope key){ + cache.invalidate(key); + } + + public void invalidateAll(){ + cache.invalidateAll(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java new file mode 100644 index 0000000..3bb75c7 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.FigSingleton; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + + +/** + * Collection version cache config + */ +@FigSingleton +public interface CollectionVersionFig extends GuicyFig { + + String CACHE_SIZE = "usergrid.collection_version_cache_size"; + String CACHE_TIMEOUT_MS = "usergrid.collection_version_cache_timeout_ms"; + String TIME_BETWEEN_DELETES_MS = "usergrid.collection_version_time_between_deletes_ms"; + String DELETES_PER_EVENT = "usergrid.collection_deletes_per_event"; + + @Key(CACHE_SIZE) + @Default("500") + int getCacheSize(); + + @Key(CACHE_TIMEOUT_MS) + @Default("2000") + int getCacheTimeout(); + + @Key(TIME_BETWEEN_DELETES_MS) + @Default("60000") + long getTimeBetweenDeletes(); + + @Key(DELETES_PER_EVENT) + @Default("10000") + int getDeletesPerEvent(); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java new file mode 100644 index 0000000..9768a55 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.corepersistence.asyncevents.CollectionDeleteTooSoonException; + +public interface CollectionVersionManager { + + /** + * Get the collection version from the cache. + */ + String getCollectionVersion(final boolean bypassCache); + + String getVersionedCollectionName(final boolean bypassCache); + + String updateCollectionVersion() throws CollectionDeleteTooSoonException; + + Long getTimeLastChanged(); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerFactory.java new file mode 100644 index 0000000..7e7a2a7 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.map.MapManagerFactory; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + + +@Singleton +public class CollectionVersionManagerFactory { + + private final LoadingCache<CollectionScope,CollectionVersionManager> versionCache; + + @Inject + public CollectionVersionManagerFactory(final CollectionVersionFig fig, + final MapManagerFactory mapManagerFactory, + final CollectionVersionCache collectionVersionCache ){ + versionCache = CacheBuilder.newBuilder() + .maximumSize( fig.getCacheSize() ) + .expireAfterWrite( fig.getCacheTimeout(), TimeUnit.MILLISECONDS ) + .build( new CacheLoader<CollectionScope, CollectionVersionManager>() { + @Override + public CollectionVersionManager load(CollectionScope scope ) throws Exception { + + final MapManager mm = mapManagerFactory + .createMapManager( CpNamingUtils.getCollectionVersionMapScope(scope.getApplication())); + return new CollectionVersionManagerImpl( scope, mm, collectionVersionCache, fig ) ; + } + } ); + } + + + public CollectionVersionManager getInstance(CollectionScope scope ) { + try { + return versionCache.get(scope); + }catch (ExecutionException e){ + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java new file mode 100644 index 0000000..7ed557c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + +import com.google.inject.Inject; +import org.apache.usergrid.corepersistence.asyncevents.CollectionDeleteTooSoonException; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Cache collection version to reduce load on Cassandra. + */ +public class CollectionVersionManagerImpl implements CollectionVersionManager { + private static final Logger logger = LoggerFactory.getLogger(CollectionVersionManagerImpl.class ); + + private final MapManager mapManager; + private final CollectionVersionCache cache; + private final CollectionScope scope; + private final CollectionVersionFig collectionVersionFig; + private final String collectionName; + + private static final String MAP_PREFIX_VERSION = "VERSION:"; + private static final String MAP_PREFIX_LAST_CHANGED = "LASTCHANGED:"; + + @Inject + public CollectionVersionManagerImpl(CollectionScope scope, MapManager mapManager, CollectionVersionCache cache, CollectionVersionFig collectionVersionFig) { + this.scope = scope; + this.mapManager = mapManager; + this.cache = cache; + this.collectionVersionFig = collectionVersionFig; + this.collectionName = scope.getCollectionName(); + } + + @Override + public String getCollectionVersion(final boolean bypassCache) { + + String version = null; + if (!bypassCache) { + version = cache.get(scope); + } + + if( version == null ) { + version = mapManager.getString(MAP_PREFIX_VERSION+collectionName); + } + + if (version != null) { + return version; + }else{ + cache.put(scope, ""); // store empty string here so empty is cached as well + } + + return ""; + } + + @Override + public Long getTimeLastChanged() { + return mapManager.getLong(MAP_PREFIX_LAST_CHANGED+collectionName); + } + + @Override + public String getVersionedCollectionName(final boolean bypassCache) { + String collectionVersion = getCollectionVersion(bypassCache); + return CollectionVersionUtil.buildVersionedNameString(collectionName, collectionVersion, false); + } + + // returns old collection version + @Override + public String updateCollectionVersion() throws CollectionDeleteTooSoonException { + // check for time last changed + Long timeLastChanged = getTimeLastChanged(); + long timeBetweenDeletes = collectionVersionFig.getTimeBetweenDeletes(); + if (timeLastChanged != null) { + if (System.currentTimeMillis() - timeLastChanged < timeBetweenDeletes) { + // too soon + throw new CollectionDeleteTooSoonException(timeLastChanged, timeBetweenDeletes); + } + } + + String oldCollectionVersion = getCollectionVersion(true); + String newCollectionVersion = getNewCollectionVersion(); + mapManager.putLong(MAP_PREFIX_LAST_CHANGED+collectionName, System.currentTimeMillis()); + mapManager.putString(MAP_PREFIX_VERSION+collectionName, newCollectionVersion); + cache.put(scope, newCollectionVersion); + logger.info("Replacing collection version for collection {}, application {}: oldVersion={} newVersion={}", + collectionName, scope.getApplication().getUuid(), oldCollectionVersion, newCollectionVersion); + return oldCollectionVersion; + } + + private static String getNewCollectionVersion() { + return UUIDGenerator.newTimeUUID().toString(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java new file mode 100644 index 0000000..46e4e09 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import com.amazonaws.util.StringUtils; +import com.google.common.base.Preconditions; + +import java.util.regex.Pattern; + +public class CollectionVersionUtil { + private static final String VERSIONED_NAME_SEPARATOR = "%~!~%"; + + public static VersionedCollectionName parseVersionedName(String versionedCollectionNameString) throws IllegalArgumentException { + Preconditions.checkNotNull(versionedCollectionNameString, "collection name string is required"); + String collectionName; + String collectionVersion; + try { + String[] parts = versionedCollectionNameString.split(Pattern.quote(VERSIONED_NAME_SEPARATOR)); + if (parts.length == 2) { + collectionName = parts[0]; + collectionVersion = parts[1]; + } else if (parts.length == 1) { + collectionName = parts[0]; + collectionVersion = ""; + } else { + throw new IllegalArgumentException("Invalid format for versioned collection, versionedCollectionNameString=" + versionedCollectionNameString); + } + } catch (Exception e) { + throw new IllegalArgumentException("Unable to parse versioned collection, versionedCollectionNameString=" + versionedCollectionNameString, e); + } + return new VersionedCollectionNameImpl(collectionName, collectionVersion); + } + + public static String getBaseCollectionName(String versionedCollectionNameString) throws IllegalArgumentException { + return parseVersionedName(versionedCollectionNameString).getCollectionName(); + } + + public static boolean collectionNameHasVersion(String collectionNameString) { + try { + VersionedCollectionName parsedName = parseVersionedName(collectionNameString); + return !StringUtils.isNullOrEmpty(parsedName.getCollectionVersion()); + } + catch (Exception e) { + return false; + } + } + + public static String buildVersionedNameString(final String baseName, final String collectionVersion, + final boolean validateBaseName) throws IllegalArgumentException { + Preconditions.checkNotNull(baseName, "base name is required"); + if (validateBaseName && baseName.contains(VERSIONED_NAME_SEPARATOR)) { + throw new IllegalArgumentException("Cannot build versioned name using a base name that already includes the version separator"); + } + if (collectionVersion == null || collectionVersion == "") { + return baseName; + } + return baseName + VERSIONED_NAME_SEPARATOR + collectionVersion; + } + + public static VersionedCollectionName createVersionedName(String baseName, String collectionVersion) { + return new VersionedCollectionNameImpl(baseName, collectionVersion); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 1b8614f..8473b2e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -36,9 +36,7 @@ import org.apache.usergrid.persistence.graph.impl.SimpleEdge; import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; import org.apache.usergrid.persistence.index.*; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; -import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.map.MapScope; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; @@ -264,6 +262,10 @@ public class IndexServiceImpl implements IndexService { final EntityIndex ei = entityIndexFactory. createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) ); + if (logger.isDebugEnabled()) { + logger.debug("deIndexEntity: entityId={}:{}, markedVersion={}, otherVersionsSize={}", + entityId.getUuid().toString(), entityId.getType(), markedVersion.toString(), allVersionsBeforeMarked.size()); + } // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code foe de-indexing // previously .timstamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data) http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionName.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionName.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionName.java new file mode 100644 index 0000000..87f4c05 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionName.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +public interface VersionedCollectionName { + + String getCollectionName(); + + String getCollectionVersion(); + + boolean hasVersion(); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionNameImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionNameImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionNameImpl.java new file mode 100644 index 0000000..5f84e54 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionNameImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.index; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang.builder.HashCodeBuilder; + + +public class VersionedCollectionNameImpl implements VersionedCollectionName { + + private final String collectionName; + private final String collectionVersion; + + public VersionedCollectionNameImpl(final String collectionName, final String collectionVersion ) { + Preconditions.checkNotNull(collectionName, "collection name is required"); + this.collectionName = collectionName; + this.collectionVersion = collectionVersion != null ? collectionVersion : ""; + } + + @Override + public String getCollectionName() { + return collectionName; + } + + @Override + public String getCollectionVersion() { + return collectionVersion; + } + + @Override + public boolean hasVersion() { + return !collectionVersion.equals(""); + } + + @Override + public boolean equals( final Object o ) { + if ( this == o ) { + return true; + } + if ( !( o instanceof VersionedCollectionNameImpl) ) { + return false; + } + + final VersionedCollectionNameImpl versionedCollectionName = (VersionedCollectionNameImpl) o; + + if ( !collectionName.equals( versionedCollectionName.collectionName ) ) { + return false; + } + if ( !collectionVersion.equals( versionedCollectionName.collectionVersion ) ) { + return false; + } + + return true; + } + + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(collectionName) + .append(collectionVersion) + .toHashCode(); + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java index 0420a32..13a85c4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java @@ -19,10 +19,7 @@ package org.apache.usergrid.corepersistence.rx.impl; - - - -import com.google.common.base.Optional; +import com.google.common.base.Optional; import com.google.inject.Inject; import com.google.inject.Singleton; http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java index 9c6e318..e9867b6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java @@ -1,4 +1,3 @@ -package org.apache.usergrid.corepersistence.util; /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,6 +17,7 @@ package org.apache.usergrid.corepersistence.util; * under the License. */ +package org.apache.usergrid.corepersistence.util; import java.util.UUID; @@ -86,6 +86,11 @@ public class CpNamingUtils { */ public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz"; + /** + * The name of the map that holds our collection->version mapping + */ + public static String VERSION_FOR_COLLECTION_MAP = "zzz_versionforcollection_zzz"; + /** * Generate a standard edge name for our graph using the connection name. To be used only for searching. DO NOT use @@ -317,6 +322,14 @@ public class CpNamingUtils { /** + * Get the map scope for the applicationId to store collection name to collection version mapping + */ + public static MapScope getCollectionVersionMapScope( final Id applicationId ) { + return new MapScopeImpl( applicationId, CpNamingUtils.VERSION_FOR_COLLECTION_MAP ); + } + + + /** * Generate either the collection name or connection name from the edgeName */ public static String getNameFromEdgeType( final String edgeName ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java index ae4623d..a977f31 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java @@ -530,6 +530,8 @@ public interface EntityManager { Object getCollectionSettings( String collectionName ); + void deleteCollection( String collectionName ); + public void grantRolePermission( String roleName, String permission ) throws Exception; public void grantRolePermissions( String roleName, Collection<String> permissions ) throws Exception; @@ -743,7 +745,7 @@ public interface EntityManager { /** * Add a new index to the application for scale - * @param suffix unique indentifier for additional index + * @param newIndexName unique identifier for additional index * @param shards number of shards * @param replicas number of replicas * @param writeConsistency only "one, quorum, or all" http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java index 3502581..5917949 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java @@ -522,8 +522,31 @@ public class Results implements Iterable<Entity> { level = Level.CORE_PROPERTIES; } + // returns null if index out of range + public Entity getEntity( final int index ) { + if (index < 0) { + return null; + } + if (entities == null) { + // single entity + if (entity == null || index > 0) { + return null; + } + return entity; + } else { + if (index >= entities.size()) { + return null; + } + return entities.get(index); + } + } + public void setEntity( final int index, final Entity entity){ - this.entities.set( index, entity ); + if (entities == null) { + this.entity = entity; + } else { + this.entities.set(index, entity); + } }
