USERGRID-1295 - Re-introduce a more efficient de-index upon entity delete and entity updates. Remove the inefficient code as a safety measure so it can't be used again.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7143cbaf Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7143cbaf Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7143cbaf Branch: refs/heads/master Commit: 7143cbaf6c26b4db00d14fb3a1b9e3eb8a2e068e Parents: 66bb5cd Author: Michael Russo <[email protected]> Authored: Fri Jun 3 22:18:12 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Fri Jun 3 22:18:12 2016 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 2 +- .../corepersistence/EntityManagerFig.java | 2 +- .../asyncevents/AsyncEventService.java | 6 +- .../asyncevents/AsyncEventServiceImpl.java | 11 +- .../asyncevents/EventBuilder.java | 6 +- .../asyncevents/EventBuilderImpl.java | 46 ++++--- .../model/DeIndexOldVersionsEvent.java | 12 +- .../corepersistence/index/IndexService.java | 23 +++- .../corepersistence/index/IndexServiceImpl.java | 86 +++++++------ .../collection/EntityCollectionManager.java | 10 +- .../impl/EntityCollectionManagerImpl.java | 32 ++++- .../serialization/SerializationFig.java | 5 + .../serialization/impl/LogEntryIterator.java | 128 +++++++++++++++++++ .../usergrid/persistence/index/EntityIndex.java | 5 +- .../usergrid/persistence/index/IndexFig.java | 2 +- .../index/impl/EsEntityIndexImpl.java | 64 +++------- .../persistence/index/impl/EntityIndexTest.java | 41 ------ 17 files changed, 313 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 68f5d71..fd31cf6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -610,7 +610,7 @@ public class CpEntityManager implements EntityManager { // queue up an event to clean-up older versions than this one from the index if (entityManagerFig.getDeindexOnUpdate()) { - indexService.queueDeIndexOldVersion( applicationScope, entityId ); + indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion()); } } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java index 4c50aee..655a968 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java @@ -39,6 +39,6 @@ public interface EntityManagerFig extends GuicyFig { int sleep(); @Key( "usergrid.entityManager.enable_deindex_on_update" ) - @Default( "false" ) + @Default( "true" ) boolean getDeindexOnUpdate(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index 9f34604..d833cf7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -27,6 +27,8 @@ import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import java.util.UUID; + /** * Low level queue service for events in the entity. These events are fire and forget, and will always be asynchronous @@ -83,11 +85,11 @@ public interface AsyncEventService extends ReIndexAction { void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ); /** - * * @param applicationScope * @param entityId + * @param markedVersion */ - void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId); + void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId, UUID markedVersion); /** * current queue depth http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 3b01292..fa175ab 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -584,11 +584,11 @@ public class AsyncEventServiceImpl implements AsyncEventService { @Override - public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId) { + public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId, UUID markedVersion) { // queue the de-index of old versions to the topic so cleanup happens in all regions offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(), - new EntityIdScope( applicationScope, entityId)) ); + new EntityIdScope( applicationScope, entityId), markedVersion) ); } @@ -596,10 +596,11 @@ public class AsyncEventServiceImpl implements AsyncEventService { public IndexOperationMessage handleDeIndexOldVersionEvent ( final DeIndexOldVersionsEvent deIndexOldVersionsEvent){ - ApplicationScope applicationScope = deIndexOldVersionsEvent.getEntityIdScope().getApplicationScope(); - Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId(); + final ApplicationScope applicationScope = deIndexOldVersionsEvent.getEntityIdScope().getApplicationScope(); + final Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId(); + final UUID markedVersion = deIndexOldVersionsEvent.getMarkedVersion(); - return eventBuilder.deIndexOlderVersions( applicationScope, entityId ) + return eventBuilder.deIndexOldVersions( applicationScope, entityId, markedVersion ) .toBlocking().lastOrDefault(null); http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java index 1f62029..4db9f4b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -21,12 +21,12 @@ package org.apache.usergrid.corepersistence.asyncevents; import java.util.List; +import java.util.UUID; import org.apache.usergrid.corepersistence.index.EntityIndexOperation; import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.index.impl.IndexOperation; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -78,9 +78,11 @@ public interface EventBuilder { * Find all versions of the entity older than the latest and de-index them. * @param applicationScope * @param entityId + * @param markedVersion * @return */ - Observable<IndexOperationMessage> deIndexOlderVersions(ApplicationScope applicationScope, Id entityId ); + Observable<IndexOperationMessage> deIndexOldVersions( ApplicationScope applicationScope, + Id entityId, UUID markedVersion ); /** * A bean to hold both our observables so the caller can choose the subscription mechanism. Note that http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index 5c827c6..02a7588 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -20,11 +20,10 @@ package org.apache.usergrid.corepersistence.asyncevents; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.UUID; -import org.apache.usergrid.persistence.collection.VersionSet; import org.apache.usergrid.utils.UUIDUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,7 +117,7 @@ public class EventBuilderImpl implements EventBuilder { //TODO USERGRID-1123: Implement so we don't iterate logs twice (latest DELETED version, then to get all DELETED) - MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking() + MvccLogEntry mostRecentlyMarked = ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking() .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED ); // De-indexing and entity deletes don't check log entries. We must do that first. If no DELETED logs, then @@ -127,13 +126,16 @@ public class EventBuilderImpl implements EventBuilder { Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty(); if(mostRecentlyMarked != null){ + + // fetch entity versions to be de-index by looking in cassandra deIndexObservable = - indexService.deleteEntityIndexes( applicationScope, entityId, mostRecentlyMarked.getVersion() ); + indexService.deIndexEntity(applicationScope, entityId, mostRecentlyMarked.getVersion(), + getVersionsOlderThanMarked(ecm, entityId, mostRecentlyMarked.getVersion())); ecmDeleteObservable = - ecm.getVersions( entityId ) + ecm.getVersionsFromMaxToMin( entityId, mostRecentlyMarked.getVersion() ) .filter( mvccLogEntry-> - UUIDUtils.compare(mvccLogEntry.getVersion(), mostRecentlyMarked.getVersion()) <= 0) + mvccLogEntry.getVersion().timestamp() <= mostRecentlyMarked.getVersion().timestamp() ) .buffer( serializationFig.getBufferSize() ) .doOnNext( buffer -> ecm.delete( buffer ) ); } @@ -173,7 +175,8 @@ public class EventBuilderImpl implements EventBuilder { @Override - public Observable<IndexOperationMessage> deIndexOlderVersions(final ApplicationScope applicationScope, Id entityId ){ + public Observable<IndexOperationMessage> deIndexOldVersions( final ApplicationScope applicationScope, + final Id entityId, final UUID markedVersion ){ if (logger.isDebugEnabled()) { logger.debug("Removing old versions of entity {} from index in app scope {}", entityId, applicationScope ); @@ -181,24 +184,31 @@ public class EventBuilderImpl implements EventBuilder { final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); - // find all versions of the entity that come before the provided entityId - VersionSet latestVersions = ecm.getLatestVersion(Collections.singletonList(entityId) ).toBlocking() - .firstOrDefault( null ); - // If there are no versions before this, allow it to return an empty observable - Observable<IndexOperationMessage> deIndexObservable = Observable.empty(); + return indexService.deIndexOldVersions( applicationScope, entityId, + getVersionsOlderThanMarked(ecm, entityId, markedVersion), markedVersion); - if(latestVersions.getMaxVersion(entityId) != null){ + } - UUID latestVersion = latestVersions.getMaxVersion(entityId).getVersion(); - deIndexObservable = - indexService.deleteEntityIndexes( applicationScope, entityId, latestVersion); + private List<UUID> getVersionsOlderThanMarked( final EntityCollectionManager ecm, + final Id entityId, final UUID markedVersion ){ - } + final List<UUID> versions = new ArrayList<>(); + + // only take last 5 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid + // clusters that do not have this in-line cleanup + ecm.getVersionsFromMaxToMin( entityId, markedVersion) + .take(5) + .forEach( mvccLogEntry -> { + if ( mvccLogEntry.getVersion().timestamp() < markedVersion.timestamp() ) { + versions.add(mvccLogEntry.getVersion()); + } + + }); - return deIndexObservable; + return versions; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java index 59694d5..1f00e14 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java @@ -21,6 +21,8 @@ package org.apache.usergrid.corepersistence.asyncevents.model; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import java.util.UUID; + /** * An index event de-indexing documents for Entity versions older than the provided Entity @@ -31,12 +33,16 @@ public final class DeIndexOldVersionsEvent extends AsyncEvent { @JsonProperty protected EntityIdScope entityIdScope; + @JsonProperty + protected UUID markedVersion; + public DeIndexOldVersionsEvent() { } - public DeIndexOldVersionsEvent(String sourceRegion, EntityIdScope entityIdScope) { + public DeIndexOldVersionsEvent(String sourceRegion, EntityIdScope entityIdScope, UUID markedVersion) { super(sourceRegion); this.entityIdScope = entityIdScope; + this.markedVersion = markedVersion; } @@ -47,4 +53,8 @@ public final class DeIndexOldVersionsEvent extends AsyncEvent { public EntityIdScope getEntityIdScope() { return entityIdScope; } + + public UUID getMarkedVersion() { + return markedVersion; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java index 54eb464..b989a9c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java @@ -20,17 +20,16 @@ package org.apache.usergrid.corepersistence.index; +import java.util.List; import java.util.UUID; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.index.IndexEdge; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import rx.Observable; -import rx.observables.ConnectableObservable; /** @@ -71,18 +70,28 @@ public interface IndexService { Observable<IndexOperationMessage> deleteIndexEdge(final ApplicationScope applicationScope, final Edge edge); - - /** - * Delete all indexes with the specified entityId + * De-index all documents with the specified entityId and versions provided. This will also remove any documents + * where the entity is a source/target node ( index docs where this entityId is a part of connections). * * @param applicationScope * @param entityId + * @param markedVersion * @return */ - Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId, - final UUID markedVersion); + Observable<IndexOperationMessage> deIndexEntity(final ApplicationScope applicationScope, final Id entityId, + final UUID markedVersion, final List<UUID> allVersionsBeforeMarked); + /** + * De-index all documents with the specified entityId and versions of the entityId provided + * + * @param applicationScope + * @param entityId + * @param markedVersion + * @return + */ + Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope, final Id entityId, + final List<UUID> versions, UUID markedVersion); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 9509626..54b18bb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -20,12 +20,7 @@ package org.apache.usergrid.corepersistence.index; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,8 +51,6 @@ import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.utils.InflectionUtils; -import org.apache.usergrid.utils.JsonUtils; -import org.apache.usergrid.utils.UUIDUtils; import com.codahale.metrics.Timer; import com.google.common.base.Optional; @@ -66,9 +59,7 @@ import com.google.inject.Singleton; import rx.Observable; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromTarget; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*; import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; @@ -278,42 +269,61 @@ public class IndexServiceImpl implements IndexService { return ObservableTimer.time( batches, addTimer ); } - //This should look up the entityId and delete any documents with a timestamp that comes before - //The edges that are connected will be compacted away from the graph. + @Override - public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope, - final Id entityId, final UUID markedVersion ) { + public Observable<IndexOperationMessage> deIndexEntity( final ApplicationScope applicationScope, final Id entityId, + final UUID markedVersion, + final List<UUID> allVersionsBeforeMarked ) { - //bootstrap the lower modules from their caches - final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) ); + final EntityIndex ei = entityIndexFactory. + createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) ); - CandidateResults crs = ei.getAllEntityVersionsBeforeMarkedVersion( entityId, markedVersion ); - //If we get no search results, its possible that something was already deleted or - //that it wasn't indexed yet. In either case we can't delete anything and return an empty observable.. - if(crs.isEmpty()) { - return Observable.empty(); - } + final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(), + CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId, + entityId.getUuid().timestamp() ) ); + + + final EntityIndexBatch batch = ei.createBatch(); + + // de-index each version of the entity before the marked version + allVersionsBeforeMarked.forEach(version -> batch.deindex(searchEdgeFromSource, entityId, version)); - UUID timeUUID = UUIDUtils.isTimeBased(entityId.getUuid()) ? entityId.getUuid() : UUIDUtils.newTimeUUID(); - //not actually sure about the timestamp but ah well. works. - SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(), + + // for now, query the index to remove docs where the entity is source/target node and older than markedVersion + // TODO: investigate getting this information from graph + CandidateResults candidateResults = ei.getNodeDocsOlderThanMarked(entityId, markedVersion ); + candidateResults.forEach(candidateResult -> batch.deindex(candidateResult)); + + return Observable.just(batch.build()); + + } + + @Override + public Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope, + final Id entityId, + final List<UUID> versions, + UUID markedVersion) { + + final EntityIndex ei = entityIndexFactory. + createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) ); + + + final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(), CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId, - timeUUID.timestamp() ) ); + entityId.getUuid().timestamp() ) ); - final Observable<IndexOperationMessage> batches = Observable.from( crs ) - //collect results into a single batch - .collect( () -> ei.createBatch(), ( batch, candidateResult ) -> { - if (logger.isDebugEnabled()) { - logger.debug("Deindexing on edge {} for entity {} added to batch", searchEdge, entityId); - } - batch.deindex( candidateResult ); - } ) - //return the future from the batch execution - .map( batch ->batch.build() ); + final EntityIndexBatch batch = ei.createBatch(); + + versions.forEach( version -> { + + batch.deindex(searchEdgeFromSource, entityId, version); + + }); + + return Observable.just(batch.build()); - return ObservableTimer.time(batches, indexTimer); } /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java index 9de8f41..22fbb5f 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java @@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection; import java.util.Collection; +import java.util.UUID; import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.model.entity.Entity; @@ -98,13 +99,20 @@ public interface EntityCollectionManager { Observable<EntitySet> load( Collection<Id> entityIds ); /** - * Get all versions of the log entry, from Max to min + * Get all versions of the log entry, from min to max * @param entityId * @return An observable stream of mvccLog entries */ Observable<MvccLogEntry> getVersions(final Id entityId); /** + * Get all versions of the log entry, from max to min + * @param entityId + * @return An observable stream of mvccLog entries + */ + Observable<MvccLogEntry> getVersionsFromMaxToMin(final Id entityId, final UUID startVersion); + + /** * Delete these versions from cassandra. Must be atomic so that read log entries are only removed. Entity data * and log entry will be deleted * @param entries http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index e71e6bb..70b06ba 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.UUID; import com.netflix.astyanax.model.ConsistencyLevel; +import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -279,6 +280,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { } ); } + @Override + public Observable<MvccLogEntry> getVersionsFromMaxToMin( final Id entityId, final UUID startVersion ) { + ValidationUtils.verifyIdentity( entityId ); + + return Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) { + @Override + protected Iterator<MvccLogEntry> getIterator() { + return new LogEntryIterator( mvccLogEntrySerializationStrategy, applicationScope, entityId, startVersion, + serializationFig.getBufferSize() ); + } + } ); + } + @Override public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) { @@ -359,6 +373,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { if ( entity == null || !entity.getEntity().isPresent() ) { final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique ); + deleteBatch.mergeShallow( valueDelete ); continue; } @@ -370,10 +385,23 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { response.addEntity( expectedUnique.getField(), entity ); } - //TODO: explore making this an Async process - //We'll repair it again if we have to deleteBatch.execute(); + // optionally sleep after read repair as some tasks immediately try to write after the delete + if ( serializationFig.getReadRepairDelay() > 0 ){ + + try { + + Thread.sleep(Math.min(serializationFig.getReadRepairDelay(), 200L)); + + } catch (InterruptedException e) { + + // do nothing if sleep fails; log and continue on + logger.warn("Sleep during unique value read repair failed."); + } + + } + return response; } catch ( ConnectionException e ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java index 96759ba..12033fe 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java @@ -57,4 +57,9 @@ public interface SerializationFig extends GuicyFig { @Key ( "usergrid.uniqueverify.poolsize" ) @Default( "150" ) int getUniqueVerifyPoolSize(); + + + @Key ( "collection.readrepair.delay" ) + @Default( "0" ) // in milliseconds + int getReadRepairDelay(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java new file mode 100644 index 0000000..de6b2bc --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java @@ -0,0 +1,128 @@ +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.UUID; + +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Preconditions; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + + +/** + * Iterator that will iterate all versions of the entity from max to min + */ +public class LogEntryIterator implements Iterator<MvccLogEntry> { + + + private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; + private final ApplicationScope scope; + private final Id entityId; + private final int pageSize; + + + private Iterator<MvccLogEntry> elementItr; + private UUID nextStart; + private UUID startVersion; + + + /** + * @param logEntrySerializationStrategy The serialization strategy to get the log entries + * @param scope The scope of the entity + * @param entityId The id of the entity + * @param pageSize The fetch size to get when querying the serialization strategy + */ + public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, + final ApplicationScope scope, final Id entityId, + final UUID startVersion, final int pageSize ) { + + Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" ); + + this.logEntrySerializationStrategy = logEntrySerializationStrategy; + this.scope = scope; + this.entityId = entityId; + this.pageSize = pageSize; + this.startVersion = startVersion; + + } + + + @Override + public boolean hasNext() { + if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) { + try { + advance(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to query cassandra", e ); + } + } + + return elementItr.hasNext(); + } + + + @Override + public MvccLogEntry next() { + if ( !hasNext() ) { + throw new NoSuchElementException( "No more elements exist" ); + } + + return elementItr.next(); + } + + + @Override + public void remove() { + throw new UnsupportedOperationException( "Remove is unsupported" ); + } + + + /** + * Advance our iterator + */ + public void advance() throws ConnectionException { + + final int requestedSize; + UUID start; + + if ( nextStart != null ) { + requestedSize = pageSize + 1; + start = nextStart; + } + else { + requestedSize = pageSize; + start = startVersion; + } + + //loop through even entry that's < this one and remove it + List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, start, requestedSize ); + + //we always remove the first version if it's equal since it's returned + if ( nextStart != null && results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) { + results.remove( 0 ); + } + + + + //we have results, set our next start. If we miss our start version (due to deletion) and we request a +1, we want to ensure we set our next, hence the >= + if ( results.size() >= pageSize ) { + nextStart = results.get( results.size() - 1 ).getVersion(); + } + //nothing left to do + else { + nextStart = null; + } + + + + + elementItr = results.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java index 81f900c..8ab2d41 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java @@ -113,14 +113,13 @@ public interface EntityIndex extends CPManager { CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId); /** - * Returns all entity documents that match the entityId and come before the marked version + * Returns all entity docs that match the entityId being the nodeId ( aka connections where entityId = sourceNode) * * @param entityId The entityId to match when searching * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted. * @return */ - CandidateResults getAllEntityVersionsBeforeMarkedVersion(final Id entityId, final UUID markedVersion); - + CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion); /** * delete all application records * http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java index e81219a..3dbf5e5 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java @@ -202,7 +202,7 @@ public interface IndexFig extends GuicyFig { @Key( "elasticsearch_queue_error_sleep_ms" ) long getSleepTimeForQueueError(); - @Default("1000") + @Default("100") @Key( ELASTICSEARCH_VERSION_QUERY_LIMIT ) int getVersionQueryLimit(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 10ee91e..3b60b57 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -533,7 +533,9 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { @Override - public CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ) { + public CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion ) { + + // TODO: investigate if functionality via iterator so a caller can page the deletion until all is gone Preconditions.checkNotNull( entityId, "entityId cannot be null" ); Preconditions.checkNotNull(markedVersion, "markedVersion cannot be null"); @@ -544,12 +546,8 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { final long markedTimestamp = markedVersion.timestamp(); - // never let the limit be less than 2 as there are potential indefinite paging issues - final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit()); - - // this query will find the document for the entity itself - final QueryBuilder entityQuery = QueryBuilders - .termQuery(IndexingUtils.ENTITY_ID_FIELDNAME, IndexingUtils.entityId(entityId)); + // never let this fetch more than 100 to save memory + final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit()); // this query will find all the documents where this entity is a source/target node final QueryBuilder nodeQuery = QueryBuilders @@ -562,49 +560,25 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { long queryTimestamp = 0L; - while(true){ + QueryBuilder timestampQuery = QueryBuilders + .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME) + .gte(queryTimestamp) + .lt(markedTimestamp); - QueryBuilder timestampQuery = QueryBuilders - .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME) - .gte(queryTimestamp) - .lte(markedTimestamp); + QueryBuilder finalQuery = QueryBuilders + .boolQuery() + .must(timestampQuery) + .must(nodeQuery); - QueryBuilder entityQueryWithTimestamp = QueryBuilders - .boolQuery() - .must(entityQuery) - .must(timestampQuery); + searchResponse = srb + .setQuery(finalQuery) + .setSize(searchLimit) + .execute() + .actionGet(); - QueryBuilder finalQuery = QueryBuilders - .boolQuery() - .should(entityQueryWithTimestamp) - .should(nodeQuery) - .minimumNumberShouldMatch(1); - searchResponse = srb - .setQuery(finalQuery) - .setSize(searchLimit) - .execute() - .actionGet(); + candidates = aggregateScrollResults(candidates, searchResponse, markedVersion); - int responseSize = searchResponse.getHits().getHits().length; - if(responseSize == 0){ - break; - } - - candidates = aggregateScrollResults(candidates, searchResponse, markedVersion); - - // update queryTimestamp to be the timestamp of the last entity returned from the query - queryTimestamp = (long) searchResponse - .getHits().getAt(responseSize - 1) - .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME); - - - if(responseSize < searchLimit){ - - break; - } - - } } catch ( Throwable t ) { logger.error( "Unable to communicate with Elasticsearch", t.getMessage() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index 008ec80..c84635d 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -383,47 +383,6 @@ public class EntityIndexTest extends BaseIT { } - /** - * Tests that we aggregate results only before the halfway version point. - */ - @Test - public void testScollingDeindex() { - - int numberOfEntities = 1000; - int versionToSearchFor = numberOfEntities / 2; - - - UUID entityUUID = UUID.randomUUID(); - Id entityId = new SimpleId( "mehCar" ); - - Map entityMap = new HashMap() {{ - put( "name", "Toyota Corolla" ); - put( "introduced", 1966 ); - put( "topspeed", 111 ); - }}; - - Entity[] entity = new Entity[numberOfEntities]; - for(int i = 0; i < numberOfEntities; i++) { - entity[i] = EntityIndexMapUtils.fromMap( entityMap ); - EntityUtils.setId(entity[i], entityId); - EntityUtils.setVersion(entity[i], UUIDGenerator.newTimeUUID()); - entity[i].setField(new UUIDField(IndexingUtils.ENTITY_ID_FIELDNAME, entityUUID)); - - IndexEdge searchEdge = new IndexEdgeImpl( appId, "mehCars", SearchEdge.NodeType.SOURCE, System.currentTimeMillis()*1000 ); - - //index the new entity. This is where the loop will be set to create like 100 entities. - indexProducer.put(entityIndex.createBatch().index( searchEdge, entity[i] ).build()).subscribe(); - - } - entityIndex.refreshAsync().toBlocking().first(); - - CandidateResults candidateResults = entityIndex - .getAllEntityVersionsBeforeMarkedVersion( entity[versionToSearchFor].getId(), - entity[versionToSearchFor].getVersion() ); - assertEquals( 501, candidateResults.size() ); - } - - private CandidateResults testQuery( final SearchEdge scope, final SearchTypes searchTypes, final String queryString,
