First pass at refactoring mark + sweep
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3e2afe23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3e2afe23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3e2afe23 Branch: refs/heads/USERGRID-641 Commit: 3e2afe23bb14761b395acbe1b6577ef024b0c3dd Parents: 36b5bad Author: Todd Nine <[email protected]> Authored: Tue May 5 17:30:08 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Sun May 10 04:59:16 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 2 +- .../corepersistence/CpEntityManagerFactory.java | 2 +- .../corepersistence/CpRelationManager.java | 4 +- .../migration/AppInfoMigrationPlugin.java | 3 +- .../collection/EntityCollectionManager.java | 20 +- .../cache/CachedEntityCollectionManager.java | 18 +- .../collection/event/EntityDeleted.java | 45 -- .../collection/event/EntityVersionCreated.java | 39 - .../collection/event/EntityVersionDeleted.java | 46 -- .../collection/guice/CollectionModule.java | 39 - .../EntityCollectionManagerFactoryImpl.java | 31 +- .../impl/EntityCollectionManagerImpl.java | 386 ++++------ .../collection/impl/EntityDeletedTask.java | 137 ---- .../impl/EntityVersionCleanupTask.java | 247 ------- .../impl/EntityVersionCreatedTask.java | 115 --- .../impl/EntityVersionTaskFactory.java | 60 -- .../mvcc/stage/delete/UniqueCleanup.java | 133 ++++ .../mvcc/stage/delete/VersionCompact.java | 120 ++++ .../MvccLogEntrySerializationStrategy.java | 14 + .../serialization/SerializationFig.java | 30 +- .../serialization/impl/LogEntryIterator.java | 114 --- .../impl/MinMaxLogEntryIterator.java | 121 ++++ .../MvccLogEntrySerializationProxyImpl.java | 14 + .../MvccLogEntrySerializationStrategyImpl.java | 83 ++- .../migration/MvccEntityDataMigrationImpl.java | 30 +- .../collection/EntityCollectionManagerIT.java | 215 ++++-- .../impl/EntityVersionCleanupTaskTest.java | 715 ------------------- .../impl/EntityVersionCreatedTaskTest.java | 241 ------- .../mvcc/stage/delete/UniqueCleanupTest.java | 712 ++++++++++++++++++ .../mvcc/stage/delete/VersionCompactTest.java | 238 ++++++ .../impl/LogEntryIteratorTest.java | 132 ---- .../impl/MinMaxLogEntryIteratorTest.java | 131 ++++ ...ccLogEntrySerializationStrategyImplTest.java | 132 +++- .../collection/util/LogEntryMock.java | 103 +-- .../src/test/resources/log4j.properties | 1 + .../core/executor/TaskExecutorFactory.java | 95 +++ .../persistence/core/rx/RxTaskScheduler.java | 2 - .../core/task/NamedTaskExecutorImpl.java | 286 -------- .../usergrid/persistence/core/task/Task.java | 48 -- .../persistence/core/task/TaskExecutor.java | 41 -- .../core/task/NamedTaskExecutorImplTest.java | 271 ------- .../usergrid/persistence/graph/GraphFig.java | 26 +- .../persistence/graph/GraphManager.java | 8 +- .../persistence/graph/GraphManagerFactory.java | 2 +- .../persistence/graph/SearchByEdge.java | 6 + .../persistence/graph/SearchByEdgeType.java | 6 + .../persistence/graph/guice/GraphModule.java | 11 - .../graph/impl/GraphManagerImpl.java | 404 ++++------- .../graph/impl/SimpleSearchByEdge.java | 40 +- .../graph/impl/SimpleSearchByEdgeType.java | 29 +- .../shard/impl/ShardGroupCompactionImpl.java | 90 ++- .../graph/CommittedGraphManagerIT.java | 8 +- .../persistence/graph/GraphManagerIT.java | 26 +- .../graph/StorageGraphManagerIT.java | 8 +- .../impl/shard/ShardGroupCompactionTest.java | 6 +- 55 files changed, 2464 insertions(+), 3422 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 460fc11..8bcc73a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -664,7 +664,7 @@ public class CpEntityManager implements EntityManager { //delete it asynchronously indexService.queueEntityDelete( applicationScope, entityId ); - return ecm.delete( entityId ); + return ecm.mark( entityId ); } else { return Observable.empty(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 6c375ef..e7ad682 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -369,7 +369,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application } final ApplicationEntityIndex aei = entityIndexFactory.createApplicationEntityIndex(applicationScope); final GraphManager managementGraphManager = managerCache.getGraphManager(managementAppScope); - final Observable deleteNodeGraph = managementGraphManager.deleteNode(applicationId, Long.MAX_VALUE); + final Observable deleteNodeGraph = managementGraphManager.markNode( applicationId, Long.MAX_VALUE ); final Observable deleteAppFromIndex = aei.deleteApplication(); return Observable.concat(copyConnections, deleteNodeGraph, deleteAppFromIndex) http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 6adeefc..df3fa82 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -528,7 +528,7 @@ public class CpRelationManager implements RelationManager { //run our delete final Edge collectionToItemEdge = createCollectionEdge( cpHeadEntity.getId(), collName, memberEntity.getId() ); - gm.deleteEdge( collectionToItemEdge ).toBlocking().last(); + gm.markEdge( collectionToItemEdge ).toBlocking().last(); /** @@ -782,7 +782,7 @@ public class CpRelationManager implements RelationManager { //delete all the edges final Edge lastEdge = - gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.deleteEdge( returnedEdge ) ).toBlocking() + gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).toBlocking() .lastOrDefault( null ); if ( lastEdge != null ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java index 30955af..97b87b3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java @@ -42,7 +42,6 @@ import org.apache.usergrid.utils.UUIDUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; -import rx.functions.Func1; import java.util.HashMap; import java.util.Map; @@ -200,7 +199,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin { final ApplicationScope systemAppScope = getApplicationScope(CpNamingUtils.SYSTEM_APP_ID ); final EntityCollectionManager systemCollectionManager = entityCollectionManagerFactory.createCollectionManager( systemAppScope ); - systemCollectionManager.delete(new SimpleId(uuid, "appinfos")).toBlocking().last(); + systemCollectionManager.mark( new SimpleId( uuid, "appinfos" ) ).toBlocking().last(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java index 5a329e3..9de8f41 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java @@ -46,11 +46,12 @@ public interface EntityCollectionManager { /** - * @param entityId MarkCommit the entity as deleted + * @param entityId MarkCommit the entity as deleted. Will not actually remove it from cassandra. This operation will + * also remove all unique properties for this entity * * @return The observable of the id after the operation has completed */ - Observable<Id> delete( Id entityId ); + Observable<Id> mark( Id entityId ); /** * @param entityId The entity id to load. @@ -96,6 +97,21 @@ public interface EntityCollectionManager { */ Observable<EntitySet> load( Collection<Id> entityIds ); + /** + * Get all versions of the log entry, from Max to min + * @param entityId + * @return An observable stream of mvccLog entries + */ + Observable<MvccLogEntry> getVersions(final Id entityId); + + /** + * Delete these versions from cassandra. Must be atomic so that read log entries are only removed. Entity data + * and log entry will be deleted + * @param entries + * @return Any observable of all successfully compacted log entries + */ + Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ); + /** * Returns health of entity data store. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java index fa35580..7a04b8d 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.FieldSet; +import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.VersionSet; import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.model.entity.Entity; @@ -84,8 +85,8 @@ public class CachedEntityCollectionManager implements EntityCollectionManager { @Override - public Observable<Id> delete( final Id entityId ) { - return targetEntityCollectionManager.delete( entityId ).doOnNext( new Action1<Id>() { + public Observable<Id> mark( final Id entityId ) { + return targetEntityCollectionManager.mark( entityId ).doOnNext( new Action1<Id>() { @Override public void call( final Id id ) { entityCache.invalidate( id ); @@ -125,6 +126,19 @@ public class CachedEntityCollectionManager implements EntityCollectionManager { return targetEntityCollectionManager.load( entityIds ); } + + @Override + public Observable<MvccLogEntry> getVersions( final Id entityId ) { + return targetEntityCollectionManager.getVersions( entityId ); + } + + + @Override + public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) { + return targetEntityCollectionManager.delete( entries ); + } + + @Override public Health getHealth() { return targetEntityCollectionManager.getHealth(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java deleted file mode 100644 index 0e9b62e..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.event; - - -import java.util.UUID; - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Id; - - -/** - * - * Invoked when an entity is deleted. The delete log entry is not removed until all instances of this listener has completed. - * If any listener fails with an exception, the entity will not be removed. - * - */ -public interface EntityDeleted { - - - /** - * The event fired when an entity is deleted - * - * @param scope The scope of the entity - * @param entityId The id of the entity - * @param version the entity version - */ - public void deleted( final ApplicationScope scope, final Id entityId, final UUID version); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java deleted file mode 100644 index 7f1be1a..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.event; - - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; - - -/** - * Invoked after a new version of an entity has been created. - * The entity should be a complete view of the entity. - */ -public interface EntityVersionCreated { - - /** - * The new version of the entity. Note that this should be a fully merged view of the entity. - * In the case of partial updates, the passed entity should be fully merged with it's previous - * entries. - * @param scope The scope of the entity - * @param entity The fully loaded and merged entity - */ - public void versionCreated( final ApplicationScope scope, final Entity entity ); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java deleted file mode 100644 index 7fd8fe7..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.event; - - -import java.util.List; - -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Id; - - -/** - * - * Invoked when an entity version is removed. Note that this is not a deletion of the entity - * itself, only the version itself. - * - */ -public interface EntityVersionDeleted { - - /** - * The version specified was removed. - * - * @param scope The scope of the entity - * @param entityId The entity Id that was removed - * @param entityVersions The versions that are to be removed - */ - public void versionDeleted(final ApplicationScope scope, final Id entityId, - final List<MvccLogEntry> entityVersions); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java index b256a44..78c7f37 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java @@ -22,27 +22,14 @@ import org.safehaus.guicyfig.GuicyFigModule; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.cache.EntityCacheFig; -import org.apache.usergrid.persistence.collection.event.EntityDeleted; -import org.apache.usergrid.persistence.collection.event.EntityVersionCreated; -import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl; -import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory; import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator; import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule; -import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl; import org.apache.usergrid.persistence.collection.service.impl.ServiceModule; -import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import com.google.inject.AbstractModule; -import com.google.inject.Inject; -import com.google.inject.Provides; -import com.google.inject.Singleton; -import com.google.inject.assistedinject.FactoryModuleBuilder; -import com.google.inject.multibindings.Multibinder; /** @@ -61,15 +48,7 @@ public abstract class CollectionModule extends AbstractModule { install( new SerializationModule() ); install( new ServiceModule() ); - install ( new FactoryModuleBuilder().build( EntityVersionTaskFactory.class )); - // users of this module can add their own implemementations - // for more information: https://github.com/google/guice/wiki/Multibindings - - Multibinder.newSetBinder( binder(), EntityVersionDeleted.class ); - Multibinder.newSetBinder( binder(), EntityVersionCreated.class ); - Multibinder.newSetBinder( binder(), EntityDeleted.class ); - // create a guice factor for getting our collection manager bind(EntityCollectionManagerFactory.class).to(EntityCollectionManagerFactoryImpl.class); @@ -81,24 +60,6 @@ public abstract class CollectionModule extends AbstractModule { configureMigrationProvider(); } -// -// @Provides -// @Singleton -// @Inject -// public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) { -// final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE); -// -// return writeStart; -// } - - @Inject - @Singleton - @Provides - @CollectionTaskExecutor - public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){ - return new NamedTaskExecutorImpl( "collectiontasks", - serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() ); - } /** http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index 761c4b5..50a4bfc 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@ -27,9 +27,10 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager; import org.apache.usergrid.persistence.collection.cache.EntityCacheFig; -import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart; +import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup; +import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact; import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify; @@ -37,11 +38,11 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify; import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; @@ -67,12 +68,13 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag private final RollbackAction rollback; private final MarkStart markStart; private final MarkCommit markCommit; + private final UniqueCleanup uniqueCleanup; + private final VersionCompact versionCompact; + private final SerializationFig serializationFig; private final MvccEntitySerializationStrategy entitySerializationStrategy; private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy; private final Keyspace keyspace; - private final EntityVersionTaskFactory entityVersionTaskFactory; - private final TaskExecutor taskExecutor; private final EntityCacheFig entityCacheFig; private final MetricsFactory metricsFactory; private final RxTaskScheduler rxTaskScheduler; @@ -84,10 +86,11 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag //create the target EM that will perform logic final EntityCollectionManager target = new EntityCollectionManagerImpl( writeStart, writeVerifyUnique, - writeOptimisticVerify, writeCommit, rollback, markStart, markCommit, + writeOptimisticVerify, writeCommit, rollback, markStart, markCommit, uniqueCleanup, versionCompact, entitySerializationStrategy, uniqueValueSerializationStrategy, - mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory, - rxTaskScheduler ); + mvccLogEntrySerializationStrategy, keyspace, + metricsFactory, serializationFig, + rxTaskScheduler, scope ); final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target ); @@ -102,13 +105,12 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit writeCommit, final RollbackAction rollback, final MarkStart markStart, final MarkCommit markCommit, - final MvccEntitySerializationStrategy entitySerializationStrategy, + final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact, + final SerializationFig serializationFig, final + MvccEntitySerializationStrategy entitySerializationStrategy, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, - final EntityVersionTaskFactory entityVersionTaskFactory, - @CollectionTaskExecutor final TaskExecutor taskExecutor, final - EntityCacheFig entityCacheFig, + final Keyspace keyspace, final EntityCacheFig entityCacheFig, MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) { this.writeStart = writeStart; @@ -118,12 +120,13 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag this.rollback = rollback; this.markStart = markStart; this.markCommit = markCommit; + this.uniqueCleanup = uniqueCleanup; + this.versionCompact = versionCompact; + this.serializationFig = serializationFig; this.entitySerializationStrategy = entitySerializationStrategy; this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; this.keyspace = keyspace; - this.entityVersionTaskFactory = entityVersionTaskFactory; - this.taskExecutor = taskExecutor; this.entityCacheFig = entityCacheFig; this.metricsFactory = metricsFactory; this.rxTaskScheduler = rxTaskScheduler; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index 6f10e86..7a32d72 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.collection.impl; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -32,11 +33,13 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.FieldSet; import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.VersionSet; -import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor; import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart; +import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup; +import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact; import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify; @@ -44,15 +47,17 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify; import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; +import org.apache.usergrid.persistence.collection.serialization.impl.MinMaxLogEntryIterator; import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; +import org.apache.usergrid.persistence.core.rx.ObservableIterator; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.Task; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.model.entity.Entity; @@ -60,7 +65,6 @@ import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import com.google.inject.Inject; @@ -73,13 +77,9 @@ import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.model.CqlResult; import com.netflix.astyanax.serializers.StringSerializer; -import rx.Notification; import rx.Observable; import rx.Subscriber; import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.schedulers.Schedulers; /** @@ -97,6 +97,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { private final WriteOptimisticVerify writeOptimisticVerify; private final WriteCommit writeCommit; private final RollbackAction rollback; + private final UniqueCleanup uniqueCleanup; + private final VersionCompact versionCompact; //delete stages @@ -107,41 +109,39 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { private final MvccEntitySerializationStrategy entitySerializationStrategy; private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; - private final EntityVersionTaskFactory entityVersionTaskFactory; - private final TaskExecutor taskExecutor; + private final SerializationFig serializationFig; - private final RxTaskScheduler rxTaskScheduler; private final Keyspace keyspace; private final Timer writeTimer; - private final Meter writeMeter; private final Timer deleteTimer; + private final Timer fieldIdTimer; + private final Timer fieldEntityTimer; private final Timer updateTimer; private final Timer loadTimer; private final Timer getLatestTimer; - private final Meter deleteMeter; - private final Meter getLatestMeter; - private final Meter loadMeter; - private final Meter updateMeter; private final ApplicationScope applicationScope; + private final RxTaskScheduler rxTaskScheduler; @Inject public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique, - final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit - writeCommit, - final RollbackAction rollback, final MarkStart markStart, - final MarkCommit markCommit, final MvccEntitySerializationStrategy entitySerializationStrategy, + final WriteOptimisticVerify writeOptimisticVerify, + final WriteCommit writeCommit, final RollbackAction rollback, + final MarkStart markStart, final MarkCommit markCommit, + final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact, + final MvccEntitySerializationStrategy entitySerializationStrategy, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, final EntityVersionTaskFactory entityVersionTaskFactory, - @CollectionTaskExecutor final TaskExecutor taskExecutor, @Assisted final ApplicationScope applicationScope, - final MetricsFactory metricsFactory, - - final RxTaskScheduler rxTaskScheduler ) { + final Keyspace keyspace, final MetricsFactory metricsFactory, + final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler, + @Assisted final ApplicationScope applicationScope ) { this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.entitySerializationStrategy = entitySerializationStrategy; + this.uniqueCleanup = uniqueCleanup; + this.versionCompact = versionCompact; + this.serializationFig = serializationFig; this.rxTaskScheduler = rxTaskScheduler; ValidationUtils.validateApplicationScope( applicationScope ); @@ -158,21 +158,16 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { this.keyspace = keyspace; - this.entityVersionTaskFactory = entityVersionTaskFactory; - this.taskExecutor = taskExecutor; this.applicationScope = applicationScope; this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; - this.writeTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"write.timer"); - this.writeMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "write.meter"); - this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "delete.timer"); - this.deleteMeter= metricsFactory.getMeter(EntityCollectionManagerImpl.class, "delete.meter"); - this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "update.timer"); - this.updateMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "update.meter"); - this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"load.timer"); - this.loadMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "load.meter"); - this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"latest.timer"); - this.getLatestMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "latest.meter"); + this.writeTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "write" ); + this.deleteTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "delete" ); + this.fieldIdTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldId" ); + this.fieldEntityTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldEntity" ); + this.updateTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "update" ); + this.loadTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "load" ); + this.getLatestTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "latest" ); } @@ -192,74 +187,26 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart ); - // execute all validation stages concurrently. Needs refactored when this is done. - // https://github.com/Netflix/RxJava/issues/627 - // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(), - // writeVerifyUnique, writeOptimisticVerify ); - final Timer.Context timer = writeTimer.time(); - return observable.map(writeCommit).doOnNext(new Action1<Entity>() { - @Override - public void call(final Entity entity) { - //TODO fire the created task first then the entityVersioncleanup - taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( applicationScope, entity )); - taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( applicationScope, entityId, - entity.getVersion(), false )); - //post-processing to come later. leave it empty for now. - } - }).doOnError( rollback ) - .doOnEach( new Action1<Notification<? super Entity>>() { - @Override - public void call( Notification<? super Entity> notification ) { - writeMeter.mark(); - } - } ) - .doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } - } ); + final Observable<Entity> write = observable.map( writeCommit ); + + return ObservableTimer.time( write, writeTimer ); } @Override - public Observable<Id> delete( final Id entityId ) { + public Observable<Id> mark( final Id entityId ) { Preconditions.checkNotNull( entityId, "Entity id is required in this stage" ); Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" ); Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" ); - final Timer.Context timer = deleteTimer.time(); - Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) ) - .map(markStart) - .doOnNext( markCommit ) - .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() { - - @Override - public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) { - MvccEntity entity = mvccEntityCollectionIoEvent.getEvent(); - Task<Void> task = entityVersionTaskFactory - .getDeleteTask( applicationScope, entity.getId(), entity.getVersion() ); - taskExecutor.submit(task); - return entity.getId(); - } - } - ) - .doOnNext(new Action1<Id>() { - @Override - public void call(Id id) { - deleteMeter.mark(); - } - }) - .doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } - } ); + Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId ) ).map( markStart ) + .doOnNext( markCommit ).compose( uniqueCleanup ).map( + entityEvent -> entityEvent.getEvent().getId() ); + - return o; + return ObservableTimer.time( o, deleteTimer ); } @@ -270,35 +217,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" ); Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" ); - final Timer.Context timer = loadTimer.time(); - return load( Collections.singleton( entityId ) ).flatMap(new Func1<EntitySet, Observable<Entity>>() { - @Override - public Observable<Entity> call(final EntitySet entitySet) { - final MvccEntity entity = entitySet.getEntity(entityId); - - if (entity == null || !entity.getEntity().isPresent()) { - return Observable.empty(); - } + final Observable<Entity> entityObservable = load( Collections.singleton( entityId ) ).flatMap( entitySet -> { + final MvccEntity entity = entitySet.getEntity( entityId ); - return Observable.just( entity.getEntity().get() ); + if ( entity == null || !entity.getEntity().isPresent() ) { + return Observable.empty(); } - }) - .doOnNext( new Action1<Entity>() { - @Override - public void call( Entity entity ) { - loadMeter.mark(); - } - } ) - .doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } - } ); - } + return Observable.just( entity.getEntity().get() ); + } ); + return ObservableTimer.time( entityObservable, loadTimer ); + } @Override @@ -306,15 +237,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Preconditions.checkNotNull( entityIds, "entityIds cannot be null" ); - final Timer.Context timer = loadTimer.time(); - - return Observable.create( new Observable.OnSubscribe<EntitySet>() { + final Observable<EntitySet> entitySetObservable = Observable.create( new Observable.OnSubscribe<EntitySet>() { @Override public void call( final Subscriber<? super EntitySet> subscriber ) { try { final EntitySet results = - entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() ); + entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() ); subscriber.onNext( results ); subscriber.onCompleted(); @@ -323,162 +252,167 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { subscriber.onError( e ); } } - } ) - .doOnNext(new Action1<EntitySet>() { - @Override - public void call(EntitySet entitySet) { - loadMeter.mark(); - } - }) - .doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } - } ); + } ); + + + return ObservableTimer.time( entitySetObservable, loadTimer ); } @Override - public Observable<Id> getIdField(final String type, final Field field ) { - final List<Field> fields = Collections.singletonList( field ); - return rx.Observable.from( fields ).map( new Func1<Field, Id>() { + public Observable<MvccLogEntry> getVersions( final Id entityId ) { + ValidationUtils.verifyIdentity( entityId ); + + return Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) { @Override - public Id call( Field field ) { - try { - final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields ); - final UniqueValue value = set.getValue( field.getName() ); - return value == null ? null : value.getEntityId(); - } - catch ( ConnectionException e ) { - logger.error( "Failed to getIdField", e ); - throw new RuntimeException( e ); - } + protected Iterator<MvccLogEntry> getIterator() { + return new MinMaxLogEntryIterator( mvccLogEntrySerializationStrategy, applicationScope, entityId, + serializationFig.getBufferSize() ); } } ); } + @Override + public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) { + Preconditions.checkNotNull( entries, "entries must not be null" ); + + + return Observable.from( entries ).map( logEntry -> new CollectionIoEvent<>( applicationScope, logEntry ) ) + .compose( versionCompact ).map( event -> event.getEvent() ); + } + + + @Override + public Observable<Id> getIdField( final String type, final Field field ) { + final List<Field> fields = Collections.singletonList( field ); + final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> { + try { + final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields ); + final UniqueValue value = set.getValue( field1.getName() ); + return value == null ? null : value.getEntityId(); + } + catch ( ConnectionException e ) { + logger.error( "Failed to getIdField", e ); + throw new RuntimeException( e ); + } + } ); + + return ObservableTimer.time( idObservable, fieldIdTimer ); + } + + /** * Retrieves all entities that correspond to each field given in the Collection. - * @param fields - * @return */ @Override - public Observable<FieldSet> getEntitiesFromFields(final String type, final Collection<Field> fields ) { - return rx.Observable.just(fields).map( new Func1<Collection<Field>, FieldSet>() { - @Override - public FieldSet call( Collection<Field> fields ) { - try { - - final UUID startTime = UUIDGenerator.newTimeUUID(); + public Observable<FieldSet> getEntitiesFromFields( final String type, final Collection<Field> fields ) { + final Observable<FieldSet> fieldSetObservable = Observable.just( fields ).map( fields1 -> { + try { - //Get back set of unique values that correspond to collection of fields - UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope,type, fields ); + final UUID startTime = UUIDGenerator.newTimeUUID(); - //Short circut if we don't have any uniqueValues from the given fields. - if(!set.iterator().hasNext()){ - return new MutableFieldSet( 0 ); - } + //Get back set of unique values that correspond to collection of fields + UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields1 ); + //Short circuit if we don't have any uniqueValues from the given fields. + if ( !set.iterator().hasNext() ) { + return new MutableFieldSet( 0 ); + } - //loop through each field, and construct an entity load - List<Id> entityIds = new ArrayList<>(fields.size()); - List<UniqueValue> uniqueValues = new ArrayList<>(fields.size()); - for(final Field expectedField: fields) { + //loop through each field, and construct an entity load + List<Id> entityIds = new ArrayList<>( fields1.size() ); + List<UniqueValue> uniqueValues = new ArrayList<>( fields1.size() ); - UniqueValue value = set.getValue(expectedField.getName()); + for ( final Field expectedField : fields1 ) { - if(value ==null){ - logger.debug( "Field does not correspond to a unique value" ); - } + UniqueValue value = set.getValue( expectedField.getName() ); - entityIds.add(value.getEntityId()); - uniqueValues.add(value); + if ( value == null ) { + logger.debug( "Field does not correspond to a unique value" ); } - //Load a entity for each entityId we retrieved. - final EntitySet entitySet = entitySerializationStrategy.load(applicationScope, entityIds, startTime); - - //now loop through and ensure the entities are there. - final MutationBatch deleteBatch = keyspace.prepareMutationBatch(); - - final MutableFieldSet response = new MutableFieldSet(fields.size()); + entityIds.add( value.getEntityId() ); + uniqueValues.add( value ); + } - for(final UniqueValue expectedUnique: uniqueValues) { - final MvccEntity entity = entitySet.getEntity(expectedUnique.getEntityId()); + //Load a entity for each entityId we retrieved. + final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime ); - //bad unique value, delete this, it's inconsistent - if(entity == null || !entity.getEntity().isPresent()){ - final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(applicationScope, expectedUnique); - deleteBatch.mergeShallow(valueDelete); - continue; - } + //now loop through and ensure the entities are there. + final MutationBatch deleteBatch = keyspace.prepareMutationBatch(); + final MutableFieldSet response = new MutableFieldSet( fields1.size() ); - //else add it to our result set - response.addEntity(expectedUnique.getField(),entity); + for ( final UniqueValue expectedUnique : uniqueValues ) { + final MvccEntity entity = entitySet.getEntity( expectedUnique.getEntityId() ); + //bad unique value, delete this, it's inconsistent + if ( entity == null || !entity.getEntity().isPresent() ) { + final MutationBatch valueDelete = + uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique ); + deleteBatch.mergeShallow( valueDelete ); + continue; } - //TODO: explore making this an Async process - //We'll repair it again if we have to - deleteBatch.execute(); - return response; + //else add it to our result set + response.addEntity( expectedUnique.getField(), entity ); + } + //TODO: explore making this an Async process + //We'll repair it again if we have to + deleteBatch.execute(); - } - catch ( ConnectionException e ) { - logger.error( "Failed to getIdField", e ); - throw new RuntimeException( e ); - } + return response; + } + catch ( ConnectionException e ) { + logger.error( "Failed to getIdField", e ); + throw new RuntimeException( e ); } } ); - } + return ObservableTimer.time( fieldSetObservable, fieldEntityTimer ); + } // fire the stages public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, WriteStart writeState ) { - return Observable.just( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() { - - @Override - public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) { + return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> { - Observable<CollectionIoEvent<MvccEntity>> unique = - Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) - .doOnNext( writeVerifyUnique ); + Observable<CollectionIoEvent<MvccEntity>> uniqueObservable = + Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) + .doOnNext( writeVerifyUnique ); - // optimistic verification - Observable<CollectionIoEvent<MvccEntity>> optimistic = - Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) - .doOnNext( writeOptimisticVerify ); + // optimistic verification + Observable<CollectionIoEvent<MvccEntity>> optimisticObservable = + Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) + .doOnNext( writeOptimisticVerify ); + final Observable<CollectionIoEvent<MvccEntity>> zip = + Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic ); - //wait for both to finish - Observable.merge( unique, optimistic ).toBlocking().last(); - } - } ); + return zip; + } ); } @Override public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) { - final Timer.Context timer = getLatestTimer.time(); - return Observable.create( new Observable.OnSubscribe<VersionSet>() { + + final Observable<VersionSet> observable = Observable.create( new Observable.OnSubscribe<VersionSet>() { @Override public void call( final Subscriber<? super VersionSet> subscriber ) { try { final VersionSet logEntries = mvccLogEntrySerializationStrategy - .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() ); + .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() ); subscriber.onNext( logEntries ); subscriber.onCompleted(); @@ -487,13 +421,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { subscriber.onError( e ); } } - } ) - .doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } - } ); + } ); + + return ObservableTimer.time( observable, getLatestTimer ); } @@ -502,11 +432,11 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { try { ColumnFamily<String, String> CF_SYSTEM_LOCAL = - new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(), - StringSerializer.get() ); + new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(), + StringSerializer.get() ); OperationResult<CqlResult<String, String>> result = - keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute(); + keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute(); if ( result.getResult().getRows().size() == 1 ) { return Health.GREEN; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java deleted file mode 100644 index 1753d26..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.collection.impl; - - -import java.util.Set; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.event.EntityDeleted; -import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; -import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.Task; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; -import com.netflix.astyanax.MutationBatch; - -import rx.Observable; -import rx.schedulers.Schedulers; - - -/** - * Fires Cleanup Task - */ -public class EntityDeletedTask implements Task<Void> { - private static final Logger LOG = LoggerFactory.getLogger(EntityDeletedTask.class); - - private final EntityVersionTaskFactory entityVersionTaskFactory; - private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; - private final MvccEntitySerializationStrategy entitySerializationStrategy; - private final Set<EntityDeleted> listeners; - private final ApplicationScope collectionScope; - private final Id entityId; - private final UUID version; - - - @Inject - public EntityDeletedTask( - EntityVersionTaskFactory entityVersionTaskFactory, - final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, - final MvccEntitySerializationStrategy entitySerializationStrategy, - final Set<EntityDeleted> listeners, // MUST be a set or Guice will not inject - @Assisted final ApplicationScope collectionScope, - @Assisted final Id entityId, - @Assisted final UUID version) { - - this.entityVersionTaskFactory = entityVersionTaskFactory; - this.logEntrySerializationStrategy = logEntrySerializationStrategy; - this.entitySerializationStrategy = entitySerializationStrategy; - this.listeners = listeners; - this.collectionScope = collectionScope; - this.entityId = entityId; - this.version = version; - } - - - @Override - public void exceptionThrown(Throwable throwable) { - LOG.error( "Unable to run update task for collection {} with entity {} and version {}", - new Object[] { collectionScope, entityId, version }, throwable ); - } - - - @Override - public Void rejected() { - try { - call(); - } - catch ( Exception e ) { - throw new RuntimeException( "Exception thrown in call task", e ); - } - - return null; - } - - - @Override - public Void call() throws Exception { - - entityVersionTaskFactory.getCleanupTask( collectionScope, entityId, version, true ).call(); - - fireEvents(); - final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version); - final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version); - - entityDelete.execute(); - logDelete.execute(); -// - return null; - } - - - private void fireEvents() { - final int listenerSize = listeners.size(); - - if ( listenerSize == 0 ) { - return; - } - - if ( listenerSize == 1 ) { - listeners.iterator().next().deleted( collectionScope, entityId,version ); - return; - } - - LOG.debug( "Started firing {} listeners", listenerSize ); - - //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time - Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> { - listener.deleted( collectionScope, entityId, version ); - } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last(); - - LOG.debug( "Finished firing {} listeners", listenerSize ); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java deleted file mode 100644 index b5f9085..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.impl; - - -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; -import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.collection.serialization.UniqueValue; -import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; -import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator; -import org.apache.usergrid.persistence.core.rx.ObservableIterator; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.Task; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; -import com.netflix.astyanax.Keyspace; -import com.netflix.astyanax.MutationBatch; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; - -import rx.Observable; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.observables.BlockingObservable; -import rx.schedulers.Schedulers; - - -/** - * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is - * retained, the range is exclusive. - */ -public class EntityVersionCleanupTask implements Task<Void> { - - private static final Logger logger = LoggerFactory.getLogger( EntityVersionCleanupTask.class ); - - private final Set<EntityVersionDeleted> listeners; - - private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; - private UniqueValueSerializationStrategy uniqueValueSerializationStrategy; - private final Keyspace keyspace; - - private final SerializationFig serializationFig; - - private final ApplicationScope scope; - private final Id entityId; - private final UUID version; - private final boolean includeVersion; - - - @Inject - public EntityVersionCleanupTask( - final SerializationFig serializationFig, - final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, - final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, - final Keyspace keyspace, - final Set<EntityVersionDeleted> listeners, // MUST be a set or Guice will not inject - @Assisted final ApplicationScope scope, - @Assisted final Id entityId, - @Assisted final UUID version, - @Assisted final boolean includeVersion) { - - this.serializationFig = serializationFig; - this.logEntrySerializationStrategy = logEntrySerializationStrategy; - this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; - this.keyspace = keyspace; - this.listeners = listeners; - this.scope = scope; - this.entityId = entityId; - this.version = version; - - this.includeVersion = includeVersion; - } - - - @Override - public void exceptionThrown( final Throwable throwable ) { - logger.error( "Unable to run update task for collection {} with entity {} and version {}", - new Object[] { scope, entityId, version }, throwable ); - } - - - @Override - public Void rejected() { - //Our task was rejected meaning our queue was full. We need this operation to run, - // so we'll run it in our current thread - try { - call(); - } - catch ( Exception e ) { - throw new RuntimeException( "Exception thrown in call task", e ); - } - - return null; - } - - - @Override - public Void call() throws Exception { - //TODO Refactor this logic into a a class that can be invoked from anywhere - //iterate all unique values - final BlockingObservable<Long> uniqueValueCleanup = - Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) { - @Override - protected Iterator<UniqueValue> getIterator() { - return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId ); - } - } ) - - //skip current versions - .skipWhile( new Func1<UniqueValue, Boolean>() { - @Override - public Boolean call( final UniqueValue uniqueValue ) { - return !includeVersion && version.equals( uniqueValue.getEntityVersion() ); - } - } ) - //buffer our buffer size, then roll them all up in a single batch mutation - .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<UniqueValue>>() { - @Override - public void call( final List<UniqueValue> uniqueValues ) { - final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); - - - for ( UniqueValue value : uniqueValues ) { - uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) ); - } - - try { - uniqueCleanupBatch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute batch mutation", e ); - } - } - } ).subscribeOn( Schedulers.io() ).countLong().toBlocking(); - - - //start calling the listeners for remove log entries - BlockingObservable<Long> versionsDeletedObservable = - - Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) { - @Override - protected Iterator<MvccLogEntry> getIterator() { - - return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version, - serializationFig.getBufferSize() ); - } - } ) - //skip current version - .skipWhile( new Func1<MvccLogEntry, Boolean>() { - @Override - public Boolean call( final MvccLogEntry mvccLogEntry ) { - return !includeVersion && version.equals( mvccLogEntry.getVersion() ); - } - } ) - //buffer them for efficiency - .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<MvccLogEntry>>() { - @Override - public void call( final List<MvccLogEntry> mvccEntities ) { - - fireEvents( mvccEntities ); - - final MutationBatch logCleanupBatch = keyspace.prepareMutationBatch(); - - - for ( MvccLogEntry entry : mvccEntities ) { - logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() )); - } - - try { - logCleanupBatch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute batch mutation", e ); - } - } - } ).subscribeOn( Schedulers.io() ).countLong().toBlocking(); - - //wait or this to complete - final Long removedCount = uniqueValueCleanup.last(); - - logger.debug( "Removed unique values for {} entities of entity {}", removedCount, entityId ); - - final Long versionCleanupCount = versionsDeletedObservable.last(); - - logger.debug( "Removed {} previous entity versions of entity {}", versionCleanupCount, entityId ); - - return null; - } - - - private void fireEvents( final List<MvccLogEntry> versions ) { - - final int listenerSize = listeners.size(); - - if ( listenerSize == 0 ) { - return; - } - - if ( listenerSize == 1 ) { - listeners.iterator().next().versionDeleted( scope, entityId, versions ); - return; - } - - logger.debug( "Started firing {} listeners", listenerSize ); - - //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time - - - //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time - Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> { - listener.versionDeleted( scope, entityId, versions ); - } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last(); - - - - logger.debug( "Finished firing {} listeners", listenerSize ); - } -} - - - http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java deleted file mode 100644 index fbbcdbd..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.impl; - - -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.event.EntityVersionCreated; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.Task; -import org.apache.usergrid.persistence.model.entity.Entity; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import rx.Observable; -import rx.schedulers.Schedulers; - - -/** - * Fires events so that all EntityVersionCreated handlers area called. - */ -public class EntityVersionCreatedTask implements Task<Void> { - private static final Logger logger = LoggerFactory.getLogger( EntityVersionCreatedTask.class ); - - private Set<EntityVersionCreated> listeners; - private final ApplicationScope collectionScope; - private final Entity entity; - - - @Inject - public EntityVersionCreatedTask( @Assisted final ApplicationScope collectionScope, - final Set<EntityVersionCreated> listeners, - @Assisted final Entity entity ) { - - this.listeners = listeners; - this.collectionScope = collectionScope; - this.entity = entity; - } - - - @Override - public void exceptionThrown( final Throwable throwable ) { - logger.error( "Unable to run update task for collection {} with entity {} and version {}", - new Object[] { collectionScope, entity}, throwable ); - } - - - @Override - public Void rejected() { - - // Our task was rejected meaning our queue was full. - // We need this operation to run, so we'll run it in our current thread - try { - call(); - } - catch ( Exception e ) { - throw new RuntimeException( "Exception thrown in call task", e ); - } - - return null; - } - - - @Override - public Void call() throws Exception { - - fireEvents(); - return null; - } - - - private void fireEvents() { - - final int listenerSize = listeners.size(); - - if ( listenerSize == 0 ) { - return; - } - - if ( listenerSize == 1 ) { - listeners.iterator().next().versionCreated( collectionScope, entity ); - return; - } - - logger.debug( "Started firing {} listeners", listenerSize ); - - - Observable.from( listeners ) - .flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> { - listener.versionCreated( collectionScope, entity ); - } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last(); - - - logger.debug( "Finished firing {} listeners", listenerSize ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java deleted file mode 100644 index 51a4607..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.impl; - - -import java.util.UUID; - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - - -public interface EntityVersionTaskFactory { - - /** - * Get a task for cleaning up latent entity data. If includeVersion = true, the passed version will be cleaned up as well - * Otherwise this is a V-1 operation - * - * @param scope - * @param entityId - * @param version - * @param includeVersion - * @return - */ - EntityVersionCleanupTask getCleanupTask( final ApplicationScope scope, final Id entityId, final UUID version, - final boolean includeVersion ); - - /** - * Get an entityVersionCreatedTask - * @param scope - * @param entity - * @return - */ - EntityVersionCreatedTask getCreatedTask( final ApplicationScope scope, final Entity entity ); - - /** - * Get an entity deleted task - * @param collectionScope - * @param entityId - * @param version - * @return - */ - EntityDeletedTask getDeleteTask( final ApplicationScope collectionScope, final Id entityId, final UUID version ); - -}
