First pass at refactoring mark + sweep
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/45aed6cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/45aed6cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/45aed6cc Branch: refs/heads/USERGRID-614 Commit: 45aed6ccd5b459aa6c52ad10f1f7f04e1d5cb1f5 Parents: 36b5bad Author: Todd Nine <[email protected]> Authored: Tue May 5 17:30:08 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Tue May 5 17:30:08 2015 -0600 ---------------------------------------------------------------------- .../collection/EntityCollectionManager.java | 14 + .../cache/CachedEntityCollectionManager.java | 14 + .../collection/event/EntityDeleted.java | 45 -- .../collection/event/EntityVersionCreated.java | 39 - .../collection/event/EntityVersionDeleted.java | 46 -- .../collection/guice/CollectionModule.java | 39 - .../EntityCollectionManagerFactoryImpl.java | 13 +- .../impl/EntityCollectionManagerImpl.java | 352 ++++----- .../collection/impl/EntityDeletedTask.java | 137 ---- .../impl/EntityVersionCleanupTask.java | 247 ------- .../impl/EntityVersionCreatedTask.java | 115 --- .../impl/EntityVersionTaskFactory.java | 60 -- .../mvcc/stage/delete/UniqueCleanup.java | 133 ++++ .../mvcc/stage/delete/VersionCompact.java | 120 ++++ .../MvccLogEntrySerializationStrategy.java | 14 + .../serialization/impl/LogEntryIterator.java | 114 --- .../serialization/impl/LogEntryObservable.java | 54 ++ .../impl/MinMaxLogEntryIterator.java | 114 +++ .../MvccLogEntrySerializationProxyImpl.java | 14 + .../MvccLogEntrySerializationStrategyImpl.java | 57 +- .../migration/MvccEntityDataMigrationImpl.java | 30 +- .../impl/EntityVersionCleanupTaskTest.java | 715 ------------------- .../impl/EntityVersionCreatedTaskTest.java | 241 ------- .../mvcc/stage/delete/UniqueCleanupTest.java | 712 ++++++++++++++++++ .../mvcc/stage/delete/VersionCompactTest.java | 238 ++++++ .../impl/LogEntryIteratorTest.java | 132 ---- .../impl/MinMaxLogEntryIteratorTest.java | 134 ++++ ...ccLogEntrySerializationStrategyImplTest.java | 85 ++- .../core/executor/TaskExecutorFactory.java | 95 +++ .../persistence/core/rx/RxTaskScheduler.java | 2 - .../core/task/NamedTaskExecutorImpl.java | 286 -------- .../usergrid/persistence/core/task/Task.java | 48 -- .../persistence/core/task/TaskExecutor.java | 41 -- .../core/task/NamedTaskExecutorImplTest.java | 271 ------- .../persistence/graph/GraphManager.java | 8 +- .../persistence/graph/GraphManagerFactory.java | 2 +- .../persistence/graph/SearchByEdge.java | 6 + .../persistence/graph/SearchByEdgeType.java | 6 + .../persistence/graph/guice/GraphModule.java | 11 - .../graph/impl/GraphManagerImpl.java | 404 ++++------- .../graph/impl/SimpleSearchByEdge.java | 40 +- .../graph/impl/SimpleSearchByEdgeType.java | 29 +- .../shard/impl/ShardGroupCompactionImpl.java | 90 ++- .../graph/CommittedGraphManagerIT.java | 8 +- .../persistence/graph/GraphManagerIT.java | 26 +- .../graph/StorageGraphManagerIT.java | 8 +- .../impl/shard/ShardGroupCompactionTest.java | 6 +- 47 files changed, 2196 insertions(+), 3219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java index 5a329e3..8c27825 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java @@ -96,6 +96,20 @@ public interface EntityCollectionManager { */ Observable<EntitySet> load( Collection<Id> entityIds ); + /** + * Get all versions of the log entry, from Max to min + * @param entityId + * @return An observable stream of mvccLog entries + */ + Observable<MvccLogEntry> getVersions(final Id entityId); + + /** + * Remove these versions. Must be atomic so that read log entries are removed + * @param entries + * @return Any observable of all successfully compacted log entries + */ + Observable<MvccLogEntry> compact(final Collection<MvccLogEntry> entries); + /** * Returns health of entity data store. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java index fa35580..9412516 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.FieldSet; +import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.VersionSet; import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.model.entity.Entity; @@ -125,6 +126,19 @@ public class CachedEntityCollectionManager implements EntityCollectionManager { return targetEntityCollectionManager.load( entityIds ); } + + @Override + public Observable<MvccLogEntry> getVersions( final Id entityId ) { + return null; + } + + + @Override + public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) { + return targetEntityCollectionManager.compact( entries ); + } + + @Override public Health getHealth() { return targetEntityCollectionManager.getHealth(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java deleted file mode 100644 index 0e9b62e..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.event; - - -import java.util.UUID; - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Id; - - -/** - * - * Invoked when an entity is deleted. The delete log entry is not removed until all instances of this listener has completed. - * If any listener fails with an exception, the entity will not be removed. - * - */ -public interface EntityDeleted { - - - /** - * The event fired when an entity is deleted - * - * @param scope The scope of the entity - * @param entityId The id of the entity - * @param version the entity version - */ - public void deleted( final ApplicationScope scope, final Id entityId, final UUID version); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java deleted file mode 100644 index 7f1be1a..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.event; - - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; - - -/** - * Invoked after a new version of an entity has been created. - * The entity should be a complete view of the entity. - */ -public interface EntityVersionCreated { - - /** - * The new version of the entity. Note that this should be a fully merged view of the entity. - * In the case of partial updates, the passed entity should be fully merged with it's previous - * entries. - * @param scope The scope of the entity - * @param entity The fully loaded and merged entity - */ - public void versionCreated( final ApplicationScope scope, final Entity entity ); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java deleted file mode 100644 index 7fd8fe7..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.event; - - -import java.util.List; - -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Id; - - -/** - * - * Invoked when an entity version is removed. Note that this is not a deletion of the entity - * itself, only the version itself. - * - */ -public interface EntityVersionDeleted { - - /** - * The version specified was removed. - * - * @param scope The scope of the entity - * @param entityId The entity Id that was removed - * @param entityVersions The versions that are to be removed - */ - public void versionDeleted(final ApplicationScope scope, final Id entityId, - final List<MvccLogEntry> entityVersions); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java index b256a44..78c7f37 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java @@ -22,27 +22,14 @@ import org.safehaus.guicyfig.GuicyFigModule; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.cache.EntityCacheFig; -import org.apache.usergrid.persistence.collection.event.EntityDeleted; -import org.apache.usergrid.persistence.collection.event.EntityVersionCreated; -import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl; -import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory; import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator; import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule; -import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl; import org.apache.usergrid.persistence.collection.service.impl.ServiceModule; -import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import com.google.inject.AbstractModule; -import com.google.inject.Inject; -import com.google.inject.Provides; -import com.google.inject.Singleton; -import com.google.inject.assistedinject.FactoryModuleBuilder; -import com.google.inject.multibindings.Multibinder; /** @@ -61,15 +48,7 @@ public abstract class CollectionModule extends AbstractModule { install( new SerializationModule() ); install( new ServiceModule() ); - install ( new FactoryModuleBuilder().build( EntityVersionTaskFactory.class )); - // users of this module can add their own implemementations - // for more information: https://github.com/google/guice/wiki/Multibindings - - Multibinder.newSetBinder( binder(), EntityVersionDeleted.class ); - Multibinder.newSetBinder( binder(), EntityVersionCreated.class ); - Multibinder.newSetBinder( binder(), EntityDeleted.class ); - // create a guice factor for getting our collection manager bind(EntityCollectionManagerFactory.class).to(EntityCollectionManagerFactoryImpl.class); @@ -81,24 +60,6 @@ public abstract class CollectionModule extends AbstractModule { configureMigrationProvider(); } -// -// @Provides -// @Singleton -// @Inject -// public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) { -// final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE); -// -// return writeStart; -// } - - @Inject - @Singleton - @Provides - @CollectionTaskExecutor - public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){ - return new NamedTaskExecutorImpl( "collectiontasks", - serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() ); - } /** http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index 761c4b5..c4422f7 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@ -27,7 +27,6 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager; import org.apache.usergrid.persistence.collection.cache.EntityCacheFig; -import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart; import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction; @@ -41,7 +40,6 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; @@ -71,8 +69,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy; private final Keyspace keyspace; - private final EntityVersionTaskFactory entityVersionTaskFactory; - private final TaskExecutor taskExecutor; private final EntityCacheFig entityCacheFig; private final MetricsFactory metricsFactory; private final RxTaskScheduler rxTaskScheduler; @@ -86,7 +82,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag writeStart, writeVerifyUnique, writeOptimisticVerify, writeCommit, rollback, markStart, markCommit, entitySerializationStrategy, uniqueValueSerializationStrategy, - mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory, + mvccLogEntrySerializationStrategy, keyspace, scope, metricsFactory, rxTaskScheduler ); @@ -105,10 +101,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag final MvccEntitySerializationStrategy entitySerializationStrategy, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, - final EntityVersionTaskFactory entityVersionTaskFactory, - @CollectionTaskExecutor final TaskExecutor taskExecutor, final - EntityCacheFig entityCacheFig, + final Keyspace keyspace, final EntityCacheFig entityCacheFig, MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) { this.writeStart = writeStart; @@ -122,8 +115,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; this.keyspace = keyspace; - this.entityVersionTaskFactory = entityVersionTaskFactory; - this.taskExecutor = taskExecutor; this.entityCacheFig = entityCacheFig; this.metricsFactory = metricsFactory; this.rxTaskScheduler = rxTaskScheduler; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index 6f10e86..83c2035 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -32,8 +32,8 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.FieldSet; import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.VersionSet; -import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor; import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart; @@ -49,10 +49,9 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.Task; -import org.apache.usergrid.persistence.core.task.TaskExecutor; import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.model.entity.Entity; @@ -60,7 +59,6 @@ import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import com.google.inject.Inject; @@ -73,13 +71,9 @@ import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.model.CqlResult; import com.netflix.astyanax.serializers.StringSerializer; -import rx.Notification; import rx.Observable; import rx.Subscriber; import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.schedulers.Schedulers; /** @@ -107,36 +101,31 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { private final MvccEntitySerializationStrategy entitySerializationStrategy; private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; - private final EntityVersionTaskFactory entityVersionTaskFactory; - private final TaskExecutor taskExecutor; private final RxTaskScheduler rxTaskScheduler; private final Keyspace keyspace; private final Timer writeTimer; - private final Meter writeMeter; private final Timer deleteTimer; + private final Timer fieldIdTimer; + private final Timer fieldEntityTimer; private final Timer updateTimer; private final Timer loadTimer; private final Timer getLatestTimer; - private final Meter deleteMeter; - private final Meter getLatestMeter; - private final Meter loadMeter; - private final Meter updateMeter; private final ApplicationScope applicationScope; + @Inject public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique, - final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit - writeCommit, - final RollbackAction rollback, final MarkStart markStart, - final MarkCommit markCommit, final MvccEntitySerializationStrategy entitySerializationStrategy, + final WriteOptimisticVerify writeOptimisticVerify, + final WriteCommit writeCommit, final RollbackAction rollback, + final MarkStart markStart, final MarkCommit markCommit, + final MvccEntitySerializationStrategy entitySerializationStrategy, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, final EntityVersionTaskFactory entityVersionTaskFactory, - @CollectionTaskExecutor final TaskExecutor taskExecutor, @Assisted final ApplicationScope applicationScope, + final Keyspace keyspace, @Assisted final ApplicationScope applicationScope, final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) { @@ -158,21 +147,16 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { this.keyspace = keyspace; - this.entityVersionTaskFactory = entityVersionTaskFactory; - this.taskExecutor = taskExecutor; this.applicationScope = applicationScope; this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; - this.writeTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"write.timer"); - this.writeMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "write.meter"); - this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "delete.timer"); - this.deleteMeter= metricsFactory.getMeter(EntityCollectionManagerImpl.class, "delete.meter"); - this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "update.timer"); - this.updateMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "update.meter"); - this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"load.timer"); - this.loadMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "load.meter"); - this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"latest.timer"); - this.getLatestMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "latest.meter"); + this.writeTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "write" ); + this.deleteTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "delete" ); + this.fieldIdTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldId" ); + this.fieldEntityTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldEntity" ); + this.updateTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "update" ); + this.loadTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "load" ); + this.getLatestTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "latest" ); } @@ -192,34 +176,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart ); - // execute all validation stages concurrently. Needs refactored when this is done. - // https://github.com/Netflix/RxJava/issues/627 - // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(), - // writeVerifyUnique, writeOptimisticVerify ); - final Timer.Context timer = writeTimer.time(); - return observable.map(writeCommit).doOnNext(new Action1<Entity>() { - @Override - public void call(final Entity entity) { - //TODO fire the created task first then the entityVersioncleanup - taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( applicationScope, entity )); - taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( applicationScope, entityId, - entity.getVersion(), false )); - //post-processing to come later. leave it empty for now. - } - }).doOnError( rollback ) - .doOnEach( new Action1<Notification<? super Entity>>() { - @Override - public void call( Notification<? super Entity> notification ) { - writeMeter.mark(); - } - } ) - .doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } - } ); + final Observable<Entity> write = observable.map( writeCommit ); + + return ObservableTimer.time( write, writeTimer ); } @@ -230,36 +190,11 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" ); Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" ); - final Timer.Context timer = deleteTimer.time(); - Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) ) - .map(markStart) - .doOnNext( markCommit ) - .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() { - - @Override - public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) { - MvccEntity entity = mvccEntityCollectionIoEvent.getEvent(); - Task<Void> task = entityVersionTaskFactory - .getDeleteTask( applicationScope, entity.getId(), entity.getVersion() ); - taskExecutor.submit(task); - return entity.getId(); - } - } - ) - .doOnNext(new Action1<Id>() { - @Override - public void call(Id id) { - deleteMeter.mark(); - } - }) - .doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } - } ); + Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) ).map( markStart ) + .doOnNext( markCommit ).map( entityEvent -> entityEvent.getEvent().getId() ); - return o; + + return ObservableTimer.time( o, deleteTimer ); } @@ -270,35 +205,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" ); Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" ); - final Timer.Context timer = loadTimer.time(); - return load( Collections.singleton( entityId ) ).flatMap(new Func1<EntitySet, Observable<Entity>>() { - @Override - public Observable<Entity> call(final EntitySet entitySet) { - final MvccEntity entity = entitySet.getEntity(entityId); - - if (entity == null || !entity.getEntity().isPresent()) { - return Observable.empty(); - } + final Observable<Entity> entityObservable= load( Collections.singleton( entityId ) ).flatMap( entitySet -> { + final MvccEntity entity = entitySet.getEntity( entityId ); - return Observable.just( entity.getEntity().get() ); + if ( entity == null || !entity.getEntity().isPresent() ) { + return Observable.empty(); } - }) - .doOnNext( new Action1<Entity>() { - @Override - public void call( Entity entity ) { - loadMeter.mark(); - } - } ) - .doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } - } ); - } + return Observable.just( entity.getEntity().get() ); + } ); + return ObservableTimer.time( entityObservable, loadTimer ); + } @Override @@ -306,15 +225,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Preconditions.checkNotNull( entityIds, "entityIds cannot be null" ); - final Timer.Context timer = loadTimer.time(); - - return Observable.create( new Observable.OnSubscribe<EntitySet>() { + final Observable<EntitySet> entitySetObservable = Observable.create( new Observable.OnSubscribe<EntitySet>() { @Override public void call( final Subscriber<? super EntitySet> subscriber ) { try { final EntitySet results = - entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() ); + entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() ); subscriber.onNext( results ); subscriber.onCompleted(); @@ -323,148 +240,144 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { subscriber.onError( e ); } } - } ) - .doOnNext(new Action1<EntitySet>() { - @Override - public void call(EntitySet entitySet) { - loadMeter.mark(); - } - }) - .doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } - } ); + } ); + + + return ObservableTimer.time( entitySetObservable, loadTimer ); } @Override - public Observable<Id> getIdField(final String type, final Field field ) { + public Observable<MvccLogEntry> getVersions( final Id entityId ) { +// mvccLogEntrySerializationStrategy.load( ) + return null; + } + + + @Override + public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) { + return null; + } + + + @Override + public Observable<Id> getIdField( final String type, final Field field ) { final List<Field> fields = Collections.singletonList( field ); - return rx.Observable.from( fields ).map( new Func1<Field, Id>() { - @Override - public Id call( Field field ) { - try { - final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields ); - final UniqueValue value = set.getValue( field.getName() ); - return value == null ? null : value.getEntityId(); - } - catch ( ConnectionException e ) { - logger.error( "Failed to getIdField", e ); - throw new RuntimeException( e ); - } + final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> { + try { + final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields ); + final UniqueValue value = set.getValue( field1.getName() ); + return value == null ? null : value.getEntityId(); + } + catch ( ConnectionException e ) { + logger.error( "Failed to getIdField", e ); + throw new RuntimeException( e ); } } ); + + return ObservableTimer.time( idObservable, fieldIdTimer ); } /** * Retrieves all entities that correspond to each field given in the Collection. - * @param fields - * @return */ @Override - public Observable<FieldSet> getEntitiesFromFields(final String type, final Collection<Field> fields ) { - return rx.Observable.just(fields).map( new Func1<Collection<Field>, FieldSet>() { - @Override - public FieldSet call( Collection<Field> fields ) { - try { + public Observable<FieldSet> getEntitiesFromFields( final String type, final Collection<Field> fields ) { + final Observable<FieldSet> fieldSetObservable = Observable.just( fields ).map( fields1 -> { + try { - final UUID startTime = UUIDGenerator.newTimeUUID(); + final UUID startTime = UUIDGenerator.newTimeUUID(); - //Get back set of unique values that correspond to collection of fields - UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope,type, fields ); - - //Short circut if we don't have any uniqueValues from the given fields. - if(!set.iterator().hasNext()){ - return new MutableFieldSet( 0 ); - } + //Get back set of unique values that correspond to collection of fields + UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields1 ); + //Short circuit if we don't have any uniqueValues from the given fields. + if ( !set.iterator().hasNext() ) { + return new MutableFieldSet( 0 ); + } - //loop through each field, and construct an entity load - List<Id> entityIds = new ArrayList<>(fields.size()); - List<UniqueValue> uniqueValues = new ArrayList<>(fields.size()); - for(final Field expectedField: fields) { + //loop through each field, and construct an entity load + List<Id> entityIds = new ArrayList<>( fields1.size() ); + List<UniqueValue> uniqueValues = new ArrayList<>( fields1.size() ); - UniqueValue value = set.getValue(expectedField.getName()); + for ( final Field expectedField : fields1 ) { - if(value ==null){ - logger.debug( "Field does not correspond to a unique value" ); - } + UniqueValue value = set.getValue( expectedField.getName() ); - entityIds.add(value.getEntityId()); - uniqueValues.add(value); + if ( value == null ) { + logger.debug( "Field does not correspond to a unique value" ); } - //Load a entity for each entityId we retrieved. - final EntitySet entitySet = entitySerializationStrategy.load(applicationScope, entityIds, startTime); - - //now loop through and ensure the entities are there. - final MutationBatch deleteBatch = keyspace.prepareMutationBatch(); - - final MutableFieldSet response = new MutableFieldSet(fields.size()); + entityIds.add( value.getEntityId() ); + uniqueValues.add( value ); + } - for(final UniqueValue expectedUnique: uniqueValues) { - final MvccEntity entity = entitySet.getEntity(expectedUnique.getEntityId()); + //Load a entity for each entityId we retrieved. + final EntitySet entitySet = + entitySerializationStrategy.load( applicationScope, entityIds, startTime ); - //bad unique value, delete this, it's inconsistent - if(entity == null || !entity.getEntity().isPresent()){ - final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(applicationScope, expectedUnique); - deleteBatch.mergeShallow(valueDelete); - continue; - } + //now loop through and ensure the entities are there. + final MutationBatch deleteBatch = keyspace.prepareMutationBatch(); + final MutableFieldSet response = new MutableFieldSet( fields1.size() ); - //else add it to our result set - response.addEntity(expectedUnique.getField(),entity); + for ( final UniqueValue expectedUnique : uniqueValues ) { + final MvccEntity entity = entitySet.getEntity( expectedUnique.getEntityId() ); + //bad unique value, delete this, it's inconsistent + if ( entity == null || !entity.getEntity().isPresent() ) { + final MutationBatch valueDelete = + uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique ); + deleteBatch.mergeShallow( valueDelete ); + continue; } - //TODO: explore making this an Async process - //We'll repair it again if we have to - deleteBatch.execute(); - return response; + //else add it to our result set + response.addEntity( expectedUnique.getField(), entity ); + } + //TODO: explore making this an Async process + //We'll repair it again if we have to + deleteBatch.execute(); - } - catch ( ConnectionException e ) { - logger.error( "Failed to getIdField", e ); - throw new RuntimeException( e ); - } + return response; + } + catch ( ConnectionException e ) { + logger.error( "Failed to getIdField", e ); + throw new RuntimeException( e ); } } ); - } + return ObservableTimer.time( fieldSetObservable, fieldEntityTimer ); + } // fire the stages public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, WriteStart writeState ) { - return Observable.just( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() { + return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> { - @Override - public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) { + Observable<CollectionIoEvent<MvccEntity>> uniqueObservable = + Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) + .doOnNext( writeVerifyUnique ); - Observable<CollectionIoEvent<MvccEntity>> unique = - Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) - .doOnNext( writeVerifyUnique ); + // optimistic verification + Observable<CollectionIoEvent<MvccEntity>> optimisticObservable = + Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) + .doOnNext( writeOptimisticVerify ); - // optimistic verification - Observable<CollectionIoEvent<MvccEntity>> optimistic = - Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) - .doOnNext( writeOptimisticVerify ); + final Observable<CollectionIoEvent<MvccEntity>> zip = Observable.zip( uniqueObservable, optimisticObservable, + ( unique, optimistic ) -> optimistic ); + return zip; - //wait for both to finish - Observable.merge( unique, optimistic ).toBlocking().last(); - } - } ); + } ); } @@ -478,7 +391,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { public void call( final Subscriber<? super VersionSet> subscriber ) { try { final VersionSet logEntries = mvccLogEntrySerializationStrategy - .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() ); + .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() ); subscriber.onNext( logEntries ); subscriber.onCompleted(); @@ -487,13 +400,12 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { subscriber.onError( e ); } } - } ) - .doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } - } ); + } ).doOnCompleted( new Action0() { + @Override + public void call() { + timer.stop(); + } + } ); } @@ -502,11 +414,11 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { try { ColumnFamily<String, String> CF_SYSTEM_LOCAL = - new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(), - StringSerializer.get() ); + new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(), + StringSerializer.get() ); OperationResult<CqlResult<String, String>> result = - keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute(); + keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute(); if ( result.getResult().getRows().size() == 1 ) { return Health.GREEN; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java deleted file mode 100644 index 1753d26..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.collection.impl; - - -import java.util.Set; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.event.EntityDeleted; -import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; -import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.Task; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; -import com.netflix.astyanax.MutationBatch; - -import rx.Observable; -import rx.schedulers.Schedulers; - - -/** - * Fires Cleanup Task - */ -public class EntityDeletedTask implements Task<Void> { - private static final Logger LOG = LoggerFactory.getLogger(EntityDeletedTask.class); - - private final EntityVersionTaskFactory entityVersionTaskFactory; - private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; - private final MvccEntitySerializationStrategy entitySerializationStrategy; - private final Set<EntityDeleted> listeners; - private final ApplicationScope collectionScope; - private final Id entityId; - private final UUID version; - - - @Inject - public EntityDeletedTask( - EntityVersionTaskFactory entityVersionTaskFactory, - final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, - final MvccEntitySerializationStrategy entitySerializationStrategy, - final Set<EntityDeleted> listeners, // MUST be a set or Guice will not inject - @Assisted final ApplicationScope collectionScope, - @Assisted final Id entityId, - @Assisted final UUID version) { - - this.entityVersionTaskFactory = entityVersionTaskFactory; - this.logEntrySerializationStrategy = logEntrySerializationStrategy; - this.entitySerializationStrategy = entitySerializationStrategy; - this.listeners = listeners; - this.collectionScope = collectionScope; - this.entityId = entityId; - this.version = version; - } - - - @Override - public void exceptionThrown(Throwable throwable) { - LOG.error( "Unable to run update task for collection {} with entity {} and version {}", - new Object[] { collectionScope, entityId, version }, throwable ); - } - - - @Override - public Void rejected() { - try { - call(); - } - catch ( Exception e ) { - throw new RuntimeException( "Exception thrown in call task", e ); - } - - return null; - } - - - @Override - public Void call() throws Exception { - - entityVersionTaskFactory.getCleanupTask( collectionScope, entityId, version, true ).call(); - - fireEvents(); - final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version); - final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version); - - entityDelete.execute(); - logDelete.execute(); -// - return null; - } - - - private void fireEvents() { - final int listenerSize = listeners.size(); - - if ( listenerSize == 0 ) { - return; - } - - if ( listenerSize == 1 ) { - listeners.iterator().next().deleted( collectionScope, entityId,version ); - return; - } - - LOG.debug( "Started firing {} listeners", listenerSize ); - - //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time - Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> { - listener.deleted( collectionScope, entityId, version ); - } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last(); - - LOG.debug( "Finished firing {} listeners", listenerSize ); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java deleted file mode 100644 index b5f9085..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.impl; - - -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; -import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.collection.serialization.UniqueValue; -import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; -import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator; -import org.apache.usergrid.persistence.core.rx.ObservableIterator; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.Task; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; -import com.netflix.astyanax.Keyspace; -import com.netflix.astyanax.MutationBatch; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; - -import rx.Observable; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.observables.BlockingObservable; -import rx.schedulers.Schedulers; - - -/** - * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is - * retained, the range is exclusive. - */ -public class EntityVersionCleanupTask implements Task<Void> { - - private static final Logger logger = LoggerFactory.getLogger( EntityVersionCleanupTask.class ); - - private final Set<EntityVersionDeleted> listeners; - - private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; - private UniqueValueSerializationStrategy uniqueValueSerializationStrategy; - private final Keyspace keyspace; - - private final SerializationFig serializationFig; - - private final ApplicationScope scope; - private final Id entityId; - private final UUID version; - private final boolean includeVersion; - - - @Inject - public EntityVersionCleanupTask( - final SerializationFig serializationFig, - final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, - final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, - final Keyspace keyspace, - final Set<EntityVersionDeleted> listeners, // MUST be a set or Guice will not inject - @Assisted final ApplicationScope scope, - @Assisted final Id entityId, - @Assisted final UUID version, - @Assisted final boolean includeVersion) { - - this.serializationFig = serializationFig; - this.logEntrySerializationStrategy = logEntrySerializationStrategy; - this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; - this.keyspace = keyspace; - this.listeners = listeners; - this.scope = scope; - this.entityId = entityId; - this.version = version; - - this.includeVersion = includeVersion; - } - - - @Override - public void exceptionThrown( final Throwable throwable ) { - logger.error( "Unable to run update task for collection {} with entity {} and version {}", - new Object[] { scope, entityId, version }, throwable ); - } - - - @Override - public Void rejected() { - //Our task was rejected meaning our queue was full. We need this operation to run, - // so we'll run it in our current thread - try { - call(); - } - catch ( Exception e ) { - throw new RuntimeException( "Exception thrown in call task", e ); - } - - return null; - } - - - @Override - public Void call() throws Exception { - //TODO Refactor this logic into a a class that can be invoked from anywhere - //iterate all unique values - final BlockingObservable<Long> uniqueValueCleanup = - Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) { - @Override - protected Iterator<UniqueValue> getIterator() { - return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId ); - } - } ) - - //skip current versions - .skipWhile( new Func1<UniqueValue, Boolean>() { - @Override - public Boolean call( final UniqueValue uniqueValue ) { - return !includeVersion && version.equals( uniqueValue.getEntityVersion() ); - } - } ) - //buffer our buffer size, then roll them all up in a single batch mutation - .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<UniqueValue>>() { - @Override - public void call( final List<UniqueValue> uniqueValues ) { - final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); - - - for ( UniqueValue value : uniqueValues ) { - uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) ); - } - - try { - uniqueCleanupBatch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute batch mutation", e ); - } - } - } ).subscribeOn( Schedulers.io() ).countLong().toBlocking(); - - - //start calling the listeners for remove log entries - BlockingObservable<Long> versionsDeletedObservable = - - Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) { - @Override - protected Iterator<MvccLogEntry> getIterator() { - - return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version, - serializationFig.getBufferSize() ); - } - } ) - //skip current version - .skipWhile( new Func1<MvccLogEntry, Boolean>() { - @Override - public Boolean call( final MvccLogEntry mvccLogEntry ) { - return !includeVersion && version.equals( mvccLogEntry.getVersion() ); - } - } ) - //buffer them for efficiency - .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<MvccLogEntry>>() { - @Override - public void call( final List<MvccLogEntry> mvccEntities ) { - - fireEvents( mvccEntities ); - - final MutationBatch logCleanupBatch = keyspace.prepareMutationBatch(); - - - for ( MvccLogEntry entry : mvccEntities ) { - logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() )); - } - - try { - logCleanupBatch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute batch mutation", e ); - } - } - } ).subscribeOn( Schedulers.io() ).countLong().toBlocking(); - - //wait or this to complete - final Long removedCount = uniqueValueCleanup.last(); - - logger.debug( "Removed unique values for {} entities of entity {}", removedCount, entityId ); - - final Long versionCleanupCount = versionsDeletedObservable.last(); - - logger.debug( "Removed {} previous entity versions of entity {}", versionCleanupCount, entityId ); - - return null; - } - - - private void fireEvents( final List<MvccLogEntry> versions ) { - - final int listenerSize = listeners.size(); - - if ( listenerSize == 0 ) { - return; - } - - if ( listenerSize == 1 ) { - listeners.iterator().next().versionDeleted( scope, entityId, versions ); - return; - } - - logger.debug( "Started firing {} listeners", listenerSize ); - - //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time - - - //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time - Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> { - listener.versionDeleted( scope, entityId, versions ); - } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last(); - - - - logger.debug( "Finished firing {} listeners", listenerSize ); - } -} - - - http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java deleted file mode 100644 index fbbcdbd..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.impl; - - -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.event.EntityVersionCreated; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.task.Task; -import org.apache.usergrid.persistence.model.entity.Entity; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import rx.Observable; -import rx.schedulers.Schedulers; - - -/** - * Fires events so that all EntityVersionCreated handlers area called. - */ -public class EntityVersionCreatedTask implements Task<Void> { - private static final Logger logger = LoggerFactory.getLogger( EntityVersionCreatedTask.class ); - - private Set<EntityVersionCreated> listeners; - private final ApplicationScope collectionScope; - private final Entity entity; - - - @Inject - public EntityVersionCreatedTask( @Assisted final ApplicationScope collectionScope, - final Set<EntityVersionCreated> listeners, - @Assisted final Entity entity ) { - - this.listeners = listeners; - this.collectionScope = collectionScope; - this.entity = entity; - } - - - @Override - public void exceptionThrown( final Throwable throwable ) { - logger.error( "Unable to run update task for collection {} with entity {} and version {}", - new Object[] { collectionScope, entity}, throwable ); - } - - - @Override - public Void rejected() { - - // Our task was rejected meaning our queue was full. - // We need this operation to run, so we'll run it in our current thread - try { - call(); - } - catch ( Exception e ) { - throw new RuntimeException( "Exception thrown in call task", e ); - } - - return null; - } - - - @Override - public Void call() throws Exception { - - fireEvents(); - return null; - } - - - private void fireEvents() { - - final int listenerSize = listeners.size(); - - if ( listenerSize == 0 ) { - return; - } - - if ( listenerSize == 1 ) { - listeners.iterator().next().versionCreated( collectionScope, entity ); - return; - } - - logger.debug( "Started firing {} listeners", listenerSize ); - - - Observable.from( listeners ) - .flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> { - listener.versionCreated( collectionScope, entity ); - } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last(); - - - logger.debug( "Finished firing {} listeners", listenerSize ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java deleted file mode 100644 index 51a4607..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.collection.impl; - - -import java.util.UUID; - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - - -public interface EntityVersionTaskFactory { - - /** - * Get a task for cleaning up latent entity data. If includeVersion = true, the passed version will be cleaned up as well - * Otherwise this is a V-1 operation - * - * @param scope - * @param entityId - * @param version - * @param includeVersion - * @return - */ - EntityVersionCleanupTask getCleanupTask( final ApplicationScope scope, final Id entityId, final UUID version, - final boolean includeVersion ); - - /** - * Get an entityVersionCreatedTask - * @param scope - * @param entity - * @return - */ - EntityVersionCreatedTask getCreatedTask( final ApplicationScope scope, final Entity entity ); - - /** - * Get an entity deleted task - * @param collectionScope - * @param entityId - * @param version - * @return - */ - EntityDeletedTask getDeleteTask( final ApplicationScope collectionScope, final Id entityId, final UUID version ); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java new file mode 100644 index 0000000..0034f03 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.collection.mvcc.stage.delete; + + +import java.util.Collections; +import java.util.Iterator; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.collection.serialization.UniqueValue; +import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; +import org.apache.usergrid.persistence.core.rx.ObservableIterator; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.codahale.metrics.Timer; +import com.fasterxml.uuid.UUIDComparator; +import com.google.inject.Inject; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +import rx.Observable; + + +/** + * Runs on an entity that as just be mark committed, and removes all unique values <= this entity + */ +public class UniqueCleanup + implements Observable.Transformer<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> { + + + private static final Logger logger = LoggerFactory.getLogger( UniqueCleanup.class ); + private final Timer uniqueCleanupTimer; + + + private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; + private final Keyspace keyspace; + + private final SerializationFig serializationFig; + + + @Inject + public UniqueCleanup( final SerializationFig serializationFig, + final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, + final Keyspace keyspace, final MetricsFactory metricsFactory ) { + + this.serializationFig = serializationFig; + this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; + this.keyspace = keyspace; + this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup" ); + } + + + @Override + public Observable<CollectionIoEvent<MvccEntity>> call( + final Observable<CollectionIoEvent<MvccEntity>> collectionIoEventObservable ) { + + final Observable<CollectionIoEvent<MvccEntity>> outputObservable = + collectionIoEventObservable.doOnNext( mvccEntityCollectionIoEvent -> { + + final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId(); + final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection(); + final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion(); + + //TODO Refactor this logic into a a class that can be invoked from anywhere + //iterate all unique values + final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup = + Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) { + @Override + protected Iterator<UniqueValue> getIterator() { + return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId ); + } + } ) + + //skip versions > the specified version + .skipWhile( uniqueValue -> { + + final UUID uniqueValueVersion = uniqueValue.getEntityVersion(); + + return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0; + } ) + + //buffer our buffer size, then roll them all up in a single batch mutation + .buffer( serializationFig.getBufferSize() ) + + //roll them up + .doOnNext( uniqueValues -> { + final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); + + + for ( UniqueValue value : uniqueValues ) { + uniqueCleanupBatch + .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) ); + } + + try { + uniqueCleanupBatch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to execute batch mutation", e ); + } + } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent ); + } ); + + return ObservableTimer.time( outputObservable, uniqueCleanupTimer ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java new file mode 100644 index 0000000..0945827 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.collection.mvcc.stage.delete; + + +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; +import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.codahale.metrics.Timer; +import com.google.inject.Inject; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + +import rx.Observable; + + +/** + * Compact all versions on the input observable by removing them from the log, and from the + * versions + */ +public class VersionCompact + implements Observable.Transformer<CollectionIoEvent<MvccLogEntry>, CollectionIoEvent<MvccLogEntry>> { + + private static final Logger logger = LoggerFactory.getLogger( VersionCompact.class ); + + private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy; + private final MvccEntitySerializationStrategy mvccEntitySerializationStrategy; + private final SerializationFig serializationFig; + private final Keyspace keyspace; + private final Timer compactTimer; + + + @Inject + public VersionCompact( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, + final SerializationFig serializationFig, final Keyspace keyspace, + final MetricsFactory metricsFactory, + final MvccEntitySerializationStrategy mvccEntitySerializationStrategy ) { + this.logEntrySerializationStrategy = logEntrySerializationStrategy; + this.serializationFig = serializationFig; + this.keyspace = keyspace; + this.mvccEntitySerializationStrategy = mvccEntitySerializationStrategy; + this.compactTimer = metricsFactory.getTimer( VersionCompact.class, "compact" ); + } + + + @Override + public Observable<CollectionIoEvent<MvccLogEntry>> call( + final Observable<CollectionIoEvent<MvccLogEntry>> collectionIoEventObservable ) { + + + final Observable<CollectionIoEvent<MvccLogEntry>> entryBuffer = + collectionIoEventObservable.buffer( serializationFig.getBufferSize() ).flatMap( + buffer -> Observable.from( buffer ).collect( () -> keyspace.prepareMutationBatch(), + ( ( mutationBatch, mvccLogEntryCollectionIoEvent ) -> { + + final ApplicationScope scope = mvccLogEntryCollectionIoEvent.getEntityCollection(); + final MvccLogEntry mvccLogEntry = mvccLogEntryCollectionIoEvent.getEvent(); + final Id entityId = mvccLogEntry.getEntityId(); + final UUID version = mvccLogEntry.getVersion(); + + //delete from our log + mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) ); + + //merge our entity delete in + mutationBatch + .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) ); + + if ( logger.isDebugEnabled() ) { + logger.debug( + "Deleting log entry and version data for entity id {} and version {} in app scope {}", + new Object[] { entityId, version, scope } ); + } + + + + + } ) ) + //delete from the entities + .doOnNext( mutationBatch -> { + try { + mutationBatch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to perform batch mutation" ); + } + } ).flatMap( batches -> Observable.from( buffer ) ) ); + + + return ObservableTimer.time( entryBuffer, compactTimer ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java index 92669a7..5e249ae 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java @@ -71,6 +71,20 @@ public interface MvccLogEntrySerializationStrategy extends Migration, VersionedD */ List<MvccLogEntry> load( ApplicationScope applicationScope, Id entityId, UUID version, int maxSize ); + + + /** + * Load a list, from lowest to highest of the stage with versions <= version up to maxSize elements + * + * @param applicationScope The applicationScope to load the entity from + * @param entityId The entity id to load + * @param minVersion The min version to seek from. Null is allowed + * @param maxSize The maximum size to return. If you receive this size, there may be more versions to load. + * + * @return A list of entities up to max size ordered from max(UUID)=> min(UUID) + */ + List<MvccLogEntry> loadReversed( ApplicationScope applicationScope, Id entityId, UUID minVersion, int maxSize ); + /** * MarkCommit the stage from the applicationScope with the given entityId and version *
