Clean up older versions of entities from the index after an entity update.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e57e9ef4 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e57e9ef4 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e57e9ef4 Branch: refs/heads/release-2.1.1 Commit: e57e9ef44809b14e3462a5540d06c1ab6dd5ff15 Parents: 9b9f835 Author: Michael Russo <[email protected]> Authored: Tue Mar 29 12:47:33 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Tue Mar 29 12:47:33 2016 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 7 ++- .../asyncevents/AsyncEventService.java | 7 +++ .../asyncevents/AsyncEventServiceImpl.java | 33 +++++++++++-- .../asyncevents/EventBuilder.java | 12 ++++- .../asyncevents/EventBuilderImpl.java | 36 +++++++++++++- .../asyncevents/model/AsyncEvent.java | 5 +- .../model/DeIndexOldVersionsEvent.java | 50 ++++++++++++++++++++ .../asyncevents/model/EdgeDeleteEvent.java | 4 +- .../model/ElasticsearchIndexEvent.java | 2 +- .../asyncevents/model/EntityDeleteEvent.java | 3 ++ .../model/InitializeApplicationIndexEvent.java | 2 +- 11 files changed, 147 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 3dbdb7d..b29e6d3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -588,9 +588,12 @@ public class CpEntityManager implements EntityManager { handleWriteUniqueVerifyException( entity, wuve ); } - // update in all containing collections and connection indexes + // queue an event to update the new entity + indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 ); - indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0); + + // queue up an event to clean-up older versions than this one from the index + indexService.queueDeIndexOldVersion( applicationScope, entityId ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index 1abf83f..9f34604 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -83,6 +83,13 @@ public interface AsyncEventService extends ReIndexAction { void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ); /** + * + * @param applicationScope + * @param entityId + */ + void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId); + + /** * current queue depth * @return */ http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 1349011..d180919 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -28,15 +28,11 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.usergrid.corepersistence.asyncevents.model.*; import org.apache.usergrid.persistence.index.impl.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent; import org.apache.usergrid.corepersistence.index.EntityIndexOperation; import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; @@ -337,6 +333,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event); + } else if (event instanceof DeIndexOldVersionsEvent) { + + indexOperationMessage = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event); + } else { throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim()); @@ -533,6 +533,29 @@ public class AsyncEventServiceImpl implements AsyncEventService { } + + @Override + public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId) { + + // queue the de-index of old versions to the topic so cleanup happens in all regions + offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(), + new EntityIdScope( applicationScope, entityId)) ); + + } + + + public IndexOperationMessage handleDeIndexOldVersionEvent ( final DeIndexOldVersionsEvent deIndexOldVersionsEvent){ + + + ApplicationScope applicationScope = deIndexOldVersionsEvent.getEntityIdScope().getApplicationScope(); + Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId(); + + return eventBuilder.deIndexOlderVersions( applicationScope, entityId ) + .toBlocking().lastOrDefault(null); + + + } + /** * this method will call initialize for each message, since we are caching the entity indexes, * we don't worry about aggregating by app id http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java index a47ec77..1f62029 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -26,6 +26,7 @@ import org.apache.usergrid.corepersistence.index.EntityIndexOperation; import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.index.impl.IndexOperation; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -56,7 +57,7 @@ public interface EventBuilder { Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge ); /** - * Return a ben with 2 obervable streams for entity delete. + * Return a bin with 2 observable streams for entity delete. * @param applicationScope * @param entityId * @return @@ -72,6 +73,15 @@ public interface EventBuilder { */ Observable<IndexOperationMessage> buildEntityIndex( EntityIndexOperation entityIndexOperation ); + + /** + * Find all versions of the entity older than the latest and de-index them. + * @param applicationScope + * @param entityId + * @return + */ + Observable<IndexOperationMessage> deIndexOlderVersions(ApplicationScope applicationScope, Id entityId ); + /** * A bean to hold both our observables so the caller can choose the subscription mechanism. Note that * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesDeleted is subscribed http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index 2edc668..9851936 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -20,8 +20,11 @@ package org.apache.usergrid.corepersistence.asyncevents; +import java.util.Collections; import java.util.List; +import java.util.UUID; +import org.apache.usergrid.persistence.collection.VersionSet; import org.apache.usergrid.utils.UUIDUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,7 +121,7 @@ public class EventBuilderImpl implements EventBuilder { MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking() .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED ); - // De-indexing and entity deletes don't check log entiries. We must do that first. If no DELETED logs, then + // De-indexing and entity deletes don't check log entries. We must do that first. If no DELETED logs, then // return an empty observable as our no-op. Observable<IndexOperationMessage> deIndexObservable = Observable.empty(); Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty(); @@ -168,4 +171,35 @@ public class EventBuilderImpl implements EventBuilder { //perform indexing on the task scheduler and start it .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) ); } + + + @Override + public Observable<IndexOperationMessage> deIndexOlderVersions(final ApplicationScope applicationScope, Id entityId ){ + + if (logger.isDebugEnabled()) { + logger.debug("Removing old versions of entity {} from index in app scope {}", entityId, applicationScope ); + } + + final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + // find all versions of the entity that come before the provided entityId + VersionSet latestVersions = ecm.getLatestVersion(Collections.singletonList(entityId) ).toBlocking() + .firstOrDefault( null ); + + // If there are no versions before this, allow it to return an empty observable + Observable<IndexOperationMessage> deIndexObservable = Observable.empty(); + + if(latestVersions.getMaxVersion(entityId) != null){ + + UUID latestVersion = latestVersions.getMaxVersion(entityId).getVersion(); + + deIndexObservable = + indexService.deleteEntityIndexes( applicationScope, entityId, latestVersion); + + } + + return deIndexObservable; + + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java index 57b5812..7b2b228 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java @@ -26,7 +26,6 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.usergrid.persistence.queue.QueueFig; /** @@ -42,7 +41,9 @@ import org.apache.usergrid.persistence.queue.QueueFig; @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ), @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ), @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ), - @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" ) + @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" ), + @JsonSubTypes.Type( value = DeIndexOldVersionsEvent.class, name = "deIndexOldVersionsEvent" ) + } ) public abstract class AsyncEvent implements Serializable { http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java new file mode 100644 index 0000000..59694d5 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents.model; + + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; + + +/** + * An index event de-indexing documents for Entity versions older than the provided Entity + */ +public final class DeIndexOldVersionsEvent extends AsyncEvent { + + + @JsonProperty + protected EntityIdScope entityIdScope; + + public DeIndexOldVersionsEvent() { + } + + public DeIndexOldVersionsEvent(String sourceRegion, EntityIdScope entityIdScope) { + super(sourceRegion); + this.entityIdScope = entityIdScope; + } + + + /** + * Get the unique message id of the + * @return + */ + public EntityIdScope getEntityIdScope() { + return entityIdScope; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java index 4bbe6f5..572ec13 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java @@ -25,7 +25,9 @@ import org.apache.usergrid.persistence.graph.Edge; import com.fasterxml.jackson.annotation.JsonProperty; - +/** + * Event that will signal to finish the actual delete (post-mark delete) for an Edge + */ public final class EdgeDeleteEvent extends AsyncEvent { @JsonProperty http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java index 049c3a5..432dd63 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; /** - * An index event for publishing to elastic search + * An index event for publishing operations (index and de-index) to Elasticsearch */ public final class ElasticsearchIndexEvent extends AsyncEvent { http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java index cb3ecda..01d2ba8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java @@ -22,6 +22,9 @@ package org.apache.usergrid.corepersistence.asyncevents.model; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +/** + * Event that will signal to finish the actual delete (post-mark delete) for an Entity + */ public final class EntityDeleteEvent extends AsyncEvent { http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java index 1a270d4..fa72249 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java @@ -25,7 +25,7 @@ import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy import org.apache.usergrid.persistence.index.IndexLocationStrategy; /** - * event to init app index + * Event to initialize the index for an application */ public class InitializeApplicationIndexEvent extends AsyncEvent {
