WIP overwrite
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/70e0e75a Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/70e0e75a Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/70e0e75a Branch: refs/heads/USERGRID-614 Commit: 70e0e75aae435df0a5045008625e2dc3d7ead37e Parents: 45aed6c Author: Todd Nine <[email protected]> Authored: Thu May 7 10:40:52 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Thu May 7 10:40:52 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 2 +- .../corepersistence/CpEntityManagerFactory.java | 2 +- .../corepersistence/CpRelationManager.java | 4 +- .../migration/AppInfoMigrationPlugin.java | 3 +- .../collection/EntityCollectionManager.java | 10 +- .../cache/CachedEntityCollectionManager.java | 10 +- .../EntityCollectionManagerFactoryImpl.java | 20 +- .../impl/EntityCollectionManagerImpl.java | 74 ++++--- .../mvcc/stage/delete/VersionCompact.java | 14 +- .../serialization/SerializationFig.java | 30 +-- .../serialization/impl/LogEntryObservable.java | 54 ----- .../impl/MinMaxLogEntryIterator.java | 27 ++- .../MvccLogEntrySerializationStrategyImpl.java | 28 +-- .../collection/EntityCollectionManagerIT.java | 215 ++++++++++++------- .../impl/MinMaxLogEntryIteratorTest.java | 7 +- ...ccLogEntrySerializationStrategyImplTest.java | 59 ++++- .../collection/util/LogEntryMock.java | 103 ++++----- .../src/test/resources/log4j.properties | 1 + .../usergrid/persistence/graph/GraphFig.java | 26 +-- 19 files changed, 377 insertions(+), 312 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 460fc11..8bcc73a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -664,7 +664,7 @@ public class CpEntityManager implements EntityManager { //delete it asynchronously indexService.queueEntityDelete( applicationScope, entityId ); - return ecm.delete( entityId ); + return ecm.mark( entityId ); } else { return Observable.empty(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 6c375ef..e7ad682 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -369,7 +369,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application } final ApplicationEntityIndex aei = entityIndexFactory.createApplicationEntityIndex(applicationScope); final GraphManager managementGraphManager = managerCache.getGraphManager(managementAppScope); - final Observable deleteNodeGraph = managementGraphManager.deleteNode(applicationId, Long.MAX_VALUE); + final Observable deleteNodeGraph = managementGraphManager.markNode( applicationId, Long.MAX_VALUE ); final Observable deleteAppFromIndex = aei.deleteApplication(); return Observable.concat(copyConnections, deleteNodeGraph, deleteAppFromIndex) http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 6adeefc..df3fa82 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -528,7 +528,7 @@ public class CpRelationManager implements RelationManager { //run our delete final Edge collectionToItemEdge = createCollectionEdge( cpHeadEntity.getId(), collName, memberEntity.getId() ); - gm.deleteEdge( collectionToItemEdge ).toBlocking().last(); + gm.markEdge( collectionToItemEdge ).toBlocking().last(); /** @@ -782,7 +782,7 @@ public class CpRelationManager implements RelationManager { //delete all the edges final Edge lastEdge = - gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.deleteEdge( returnedEdge ) ).toBlocking() + gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).toBlocking() .lastOrDefault( null ); if ( lastEdge != null ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java index 30955af..97b87b3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java @@ -42,7 +42,6 @@ import org.apache.usergrid.utils.UUIDUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; -import rx.functions.Func1; import java.util.HashMap; import java.util.Map; @@ -200,7 +199,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin { final ApplicationScope systemAppScope = getApplicationScope(CpNamingUtils.SYSTEM_APP_ID ); final EntityCollectionManager systemCollectionManager = entityCollectionManagerFactory.createCollectionManager( systemAppScope ); - systemCollectionManager.delete(new SimpleId(uuid, "appinfos")).toBlocking().last(); + systemCollectionManager.mark( new SimpleId( uuid, "appinfos" ) ).toBlocking().last(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java index 8c27825..9de8f41 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java @@ -46,11 +46,12 @@ public interface EntityCollectionManager { /** - * @param entityId MarkCommit the entity as deleted + * @param entityId MarkCommit the entity as deleted. Will not actually remove it from cassandra. This operation will + * also remove all unique properties for this entity * * @return The observable of the id after the operation has completed */ - Observable<Id> delete( Id entityId ); + Observable<Id> mark( Id entityId ); /** * @param entityId The entity id to load. @@ -104,11 +105,12 @@ public interface EntityCollectionManager { Observable<MvccLogEntry> getVersions(final Id entityId); /** - * Remove these versions. Must be atomic so that read log entries are removed + * Delete these versions from cassandra. Must be atomic so that read log entries are only removed. Entity data + * and log entry will be deleted * @param entries * @return Any observable of all successfully compacted log entries */ - Observable<MvccLogEntry> compact(final Collection<MvccLogEntry> entries); + Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ); /** http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java index 9412516..7a04b8d 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java @@ -85,8 +85,8 @@ public class CachedEntityCollectionManager implements EntityCollectionManager { @Override - public Observable<Id> delete( final Id entityId ) { - return targetEntityCollectionManager.delete( entityId ).doOnNext( new Action1<Id>() { + public Observable<Id> mark( final Id entityId ) { + return targetEntityCollectionManager.mark( entityId ).doOnNext( new Action1<Id>() { @Override public void call( final Id id ) { entityCache.invalidate( id ); @@ -129,13 +129,13 @@ public class CachedEntityCollectionManager implements EntityCollectionManager { @Override public Observable<MvccLogEntry> getVersions( final Id entityId ) { - return null; + return targetEntityCollectionManager.getVersions( entityId ); } @Override - public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) { - return targetEntityCollectionManager.compact( entries ); + public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) { + return targetEntityCollectionManager.delete( entries ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index c4422f7..50a4bfc 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@ -29,6 +29,8 @@ import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionMa import org.apache.usergrid.persistence.collection.cache.EntityCacheFig; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart; +import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup; +import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact; import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify; @@ -36,6 +38,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify; import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; @@ -65,6 +68,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag private final RollbackAction rollback; private final MarkStart markStart; private final MarkCommit markCommit; + private final UniqueCleanup uniqueCleanup; + private final VersionCompact versionCompact; + private final SerializationFig serializationFig; private final MvccEntitySerializationStrategy entitySerializationStrategy; private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy; @@ -80,10 +86,11 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag //create the target EM that will perform logic final EntityCollectionManager target = new EntityCollectionManagerImpl( writeStart, writeVerifyUnique, - writeOptimisticVerify, writeCommit, rollback, markStart, markCommit, + writeOptimisticVerify, writeCommit, rollback, markStart, markCommit, uniqueCleanup, versionCompact, entitySerializationStrategy, uniqueValueSerializationStrategy, - mvccLogEntrySerializationStrategy, keyspace, scope, metricsFactory, - rxTaskScheduler ); + mvccLogEntrySerializationStrategy, keyspace, + metricsFactory, serializationFig, + rxTaskScheduler, scope ); final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target ); @@ -98,7 +105,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit writeCommit, final RollbackAction rollback, final MarkStart markStart, final MarkCommit markCommit, - final MvccEntitySerializationStrategy entitySerializationStrategy, + final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact, + final SerializationFig serializationFig, final + MvccEntitySerializationStrategy entitySerializationStrategy, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, final Keyspace keyspace, final EntityCacheFig entityCacheFig, @@ -111,6 +120,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag this.rollback = rollback; this.markStart = markStart; this.markCommit = markCommit; + this.uniqueCleanup = uniqueCleanup; + this.versionCompact = versionCompact; + this.serializationFig = serializationFig; this.entitySerializationStrategy = entitySerializationStrategy; this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index 83c2035..7a32d72 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.collection.impl; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -37,6 +38,8 @@ import org.apache.usergrid.persistence.collection.VersionSet; import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart; +import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup; +import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact; import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify; @@ -44,12 +47,15 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify; import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; +import org.apache.usergrid.persistence.collection.serialization.impl.MinMaxLogEntryIterator; import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.ObservableTimer; +import org.apache.usergrid.persistence.core.rx.ObservableIterator; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.Health; @@ -91,6 +97,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { private final WriteOptimisticVerify writeOptimisticVerify; private final WriteCommit writeCommit; private final RollbackAction rollback; + private final UniqueCleanup uniqueCleanup; + private final VersionCompact versionCompact; //delete stages @@ -101,8 +109,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { private final MvccEntitySerializationStrategy entitySerializationStrategy; private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; + private final SerializationFig serializationFig; - private final RxTaskScheduler rxTaskScheduler; private final Keyspace keyspace; private final Timer writeTimer; @@ -114,7 +122,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { private final Timer getLatestTimer; private final ApplicationScope applicationScope; - + private final RxTaskScheduler rxTaskScheduler; @Inject @@ -122,15 +130,18 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit writeCommit, final RollbackAction rollback, final MarkStart markStart, final MarkCommit markCommit, + final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact, final MvccEntitySerializationStrategy entitySerializationStrategy, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, @Assisted final ApplicationScope applicationScope, - final MetricsFactory metricsFactory, - - final RxTaskScheduler rxTaskScheduler ) { + final Keyspace keyspace, final MetricsFactory metricsFactory, + final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler, + @Assisted final ApplicationScope applicationScope ) { this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.entitySerializationStrategy = entitySerializationStrategy; + this.uniqueCleanup = uniqueCleanup; + this.versionCompact = versionCompact; + this.serializationFig = serializationFig; this.rxTaskScheduler = rxTaskScheduler; ValidationUtils.validateApplicationScope( applicationScope ); @@ -184,14 +195,15 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override - public Observable<Id> delete( final Id entityId ) { + public Observable<Id> mark( final Id entityId ) { Preconditions.checkNotNull( entityId, "Entity id is required in this stage" ); Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" ); Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" ); - Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) ).map( markStart ) - .doOnNext( markCommit ).map( entityEvent -> entityEvent.getEvent().getId() ); + Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId ) ).map( markStart ) + .doOnNext( markCommit ).compose( uniqueCleanup ).map( + entityEvent -> entityEvent.getEvent().getId() ); return ObservableTimer.time( o, deleteTimer ); @@ -205,7 +217,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" ); Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" ); - final Observable<Entity> entityObservable= load( Collections.singleton( entityId ) ).flatMap( entitySet -> { + final Observable<Entity> entityObservable = load( Collections.singleton( entityId ) ).flatMap( entitySet -> { final MvccEntity entity = entitySet.getEntity( entityId ); if ( entity == null || !entity.getEntity().isPresent() ) { @@ -225,7 +237,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Preconditions.checkNotNull( entityIds, "entityIds cannot be null" ); - final Observable<EntitySet> entitySetObservable = Observable.create( new Observable.OnSubscribe<EntitySet>() { + final Observable<EntitySet> entitySetObservable = Observable.create( new Observable.OnSubscribe<EntitySet>() { @Override public void call( final Subscriber<? super EntitySet> subscriber ) { @@ -249,21 +261,32 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override public Observable<MvccLogEntry> getVersions( final Id entityId ) { -// mvccLogEntrySerializationStrategy.load( ) - return null; + ValidationUtils.verifyIdentity( entityId ); + + return Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) { + @Override + protected Iterator<MvccLogEntry> getIterator() { + return new MinMaxLogEntryIterator( mvccLogEntrySerializationStrategy, applicationScope, entityId, + serializationFig.getBufferSize() ); + } + } ); } @Override - public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) { - return null; + public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) { + Preconditions.checkNotNull( entries, "entries must not be null" ); + + + return Observable.from( entries ).map( logEntry -> new CollectionIoEvent<>( applicationScope, logEntry ) ) + .compose( versionCompact ).map( event -> event.getEvent() ); } @Override public Observable<Id> getIdField( final String type, final Field field ) { final List<Field> fields = Collections.singletonList( field ); - final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> { + final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> { try { final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields ); final UniqueValue value = set.getValue( field1.getName() ); @@ -315,8 +338,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { } //Load a entity for each entityId we retrieved. - final EntitySet entitySet = - entitySerializationStrategy.load( applicationScope, entityIds, startTime ); + final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime ); //now loop through and ensure the entities are there. final MutationBatch deleteBatch = keyspace.prepareMutationBatch(); @@ -372,11 +394,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) .doOnNext( writeOptimisticVerify ); - final Observable<CollectionIoEvent<MvccEntity>> zip = Observable.zip( uniqueObservable, optimisticObservable, - ( unique, optimistic ) -> optimistic ); + final Observable<CollectionIoEvent<MvccEntity>> zip = + Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic ); return zip; - } ); } @@ -384,8 +405,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) { - final Timer.Context timer = getLatestTimer.time(); - return Observable.create( new Observable.OnSubscribe<VersionSet>() { + + final Observable<VersionSet> observable = Observable.create( new Observable.OnSubscribe<VersionSet>() { @Override public void call( final Subscriber<? super VersionSet> subscriber ) { @@ -400,12 +421,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { subscriber.onError( e ); } } - } ).doOnCompleted( new Action0() { - @Override - public void call() { - timer.stop(); - } } ); + + return ObservableTimer.time( observable, getLatestTimer ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java index 0945827..424ec86 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java @@ -87,13 +87,6 @@ public class VersionCompact final Id entityId = mvccLogEntry.getEntityId(); final UUID version = mvccLogEntry.getVersion(); - //delete from our log - mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) ); - - //merge our entity delete in - mutationBatch - .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) ); - if ( logger.isDebugEnabled() ) { logger.debug( "Deleting log entry and version data for entity id {} and version {} in app scope {}", @@ -101,6 +94,13 @@ public class VersionCompact } + //delete from our log + mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) ); + + //merge our entity delete in + mutationBatch + .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) ); + } ) ) http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java index 381a24e..6591781 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java @@ -24,42 +24,18 @@ public interface SerializationFig extends GuicyFig { @Default("5") int getTimeout(); - /** - * Number of history items to return for delete. - * - * @return Timeout in seconds. - */ - @Key("collection.delete.history.size") - @Default("100") - int getHistorySize(); /** * Number of items to buffer. * - * @return Timeout in seconds. + * @return Number of items to buffer in memory */ - @Key("collection.buffer.size") - @Default("10") + @Key("buffer.size") + @Default("100") int getBufferSize(); /** - * The size of threads to have in the task pool - */ - @Key( "collection.task.pool.threadsize" ) - @Default( "20" ) - int getTaskPoolThreadSize(); - - - - /** - * The size of threads to have in the task pool - */ - @Key( "collection.task.pool.queuesize" ) - @Default( "20" ) - int getTaskPoolQueueSize(); - - /** * The maximum amount of entities we can load in a single request * TODO, change this and move it into a common setting that both query and collection share */ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java deleted file mode 100644 index 072e0ea..0000000 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.collection.serialization.impl; - - -import org.apache.usergrid.persistence.collection.MvccLogEntry; -import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; - -import rx.Observable; -import rx.Subscriber; - - -/** - * An observable that emits log entries from MIN to MAX - */ -public class LogEntryObservable implements Observable.OnSubscribe<MvccLogEntry>{ - - private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy; - - - public LogEntryObservable( final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) { - this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy; - } - - - @Override - public void call( final Subscriber<? super MvccLogEntry> subscriber ) { - - subscriber.onStart(); - - while(!subscriber.isUnsubscribed()){ - - - - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java index a8e15a7..eae8c06 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java @@ -35,20 +35,16 @@ public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> { * @param logEntrySerializationStrategy The serialization strategy to get the log entries * @param scope The scope of the entity * @param entityId The id of the entity - * @param maxVersion The max version of the entity. Iterator will iterate from min to min starting with the version - * < max * @param pageSize The fetch size to get when querying the serialization strategy */ public MinMaxLogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, - final ApplicationScope scope, final Id entityId, final UUID maxVersion, - final int pageSize ) { + final ApplicationScope scope, final Id entityId, final int pageSize ) { Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" ); this.logEntrySerializationStrategy = logEntrySerializationStrategy; this.scope = scope; this.entityId = entityId; - this.nextStart = maxVersion; this.pageSize = pageSize; } @@ -89,19 +85,27 @@ public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> { */ public void advance() throws ConnectionException { - final int requestedSize = pageSize + 1; + final int requestedSize; + + if ( nextStart != null ) { + requestedSize = pageSize + 1; + } + else { + requestedSize = pageSize; + } //loop through even entry that's < this one and remove it - List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize ); + List<MvccLogEntry> results = logEntrySerializationStrategy.loadReversed( scope, entityId, nextStart, requestedSize ); //we always remove the first version if it's equal since it's returned - if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) { + if ( nextStart != null && results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) { results.remove( 0 ); } - //we have results, set our next start - if ( results.size() == pageSize ) { + + //we have results, set our next start. If we miss our start version (due to deletion) and we request a +1, we want to ensure we set our next, hence the >= + if ( results.size() >= pageSize ) { nextStart = results.get( results.size() - 1 ).getVersion(); } //nothing left to do @@ -109,6 +113,9 @@ public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> { nextStart = null; } + + + elementItr = results.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java index 73804e4..0c0d961 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java @@ -89,19 +89,16 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo final UUID colName = entry.getVersion(); final StageStatus stageStatus = new StageStatus( stage, entry.getState() ); - return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), new RowOp() { - @Override - public void doOp( final ColumnListMutation<UUID> colMutation ) { - - //Write the stage with a timeout, it's set as transient - if ( stage.isTransient() ) { - colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() ); - return; - } - - //otherwise it's persistent, write it with no expiration - colMutation.putColumn( colName, stageStatus, SER, null ); + return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), colMutation -> { + + //Write the stage with a timeout, it's set as transient + if ( stage.isTransient() ) { + colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() ); + return; } + + //otherwise it's persistent, write it with no expiration + colMutation.putColumn( colName, stageStatus, SER, null ); } ); } @@ -253,12 +250,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo Preconditions.checkNotNull( entityId, "entityId is required" ); Preconditions.checkNotNull( version, "version context is required" ); - return doWrite( context, entityId, version, new RowOp() { - @Override - public void doOp( final ColumnListMutation<UUID> colMutation ) { - colMutation.deleteColumn( version ); - } - } ); + return doWrite( context, entityId, version, colMutation -> colMutation.deleteColumn( version ) ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java index baceeb4..36faf62 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java @@ -91,8 +91,7 @@ public class EntityCollectionManagerIT { public void write() { - ApplicationScope context = - new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); @@ -113,8 +112,7 @@ public class EntityCollectionManagerIT { public void writeWithUniqueValues() { - ApplicationScope context = - new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -146,8 +144,7 @@ public class EntityCollectionManagerIT { public void writeAndLoad() { - ApplicationScope context = - new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); @@ -174,7 +171,7 @@ public class EntityCollectionManagerIT { public void writeLoadDelete() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -185,30 +182,27 @@ public class EntityCollectionManagerIT { assertNotNull( "Id was assigned", createReturned.getId() ); - - UUID version = createReturned.getVersion(); - Observable<Entity> loadObservable = manager.load( createReturned.getId() ); Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null ); assertEquals( "Same value", createReturned, loadReturned ); - manager.delete( createReturned.getId() ).toBlocking().last(); + manager.mark( createReturned.getId() ).toBlocking().last(); loadObservable = manager.load( createReturned.getId() ); //load may return null, use last or default loadReturned = loadObservable.toBlocking().lastOrDefault( null ); - assertNull("Entity was deleted", loadReturned); + assertNull( "Entity was deleted", loadReturned ); } @Test public void writeLoadUpdateLoad() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); newEntity.setField( new IntegerField( "counter", 1 ) ); @@ -217,36 +211,36 @@ public class EntityCollectionManagerIT { Observable<Entity> observable = manager.write( newEntity ); - Entity createReturned = observable.toBlocking().lastOrDefault(null); + Entity createReturned = observable.toBlocking().lastOrDefault( null ); assertNotNull( "Id was assigned", createReturned.getId() ); - Observable<Entity> loadObservable = manager.load(createReturned.getId()); + Observable<Entity> loadObservable = manager.load( createReturned.getId() ); Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null ); assertEquals( "Same value", createReturned, loadReturned ); - assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter")); + assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) ); //update the field to 2 createReturned.setField( new IntegerField( "counter", 2 ) ); //wait for the write to complete - manager.write( createReturned ).toBlocking().lastOrDefault(null); + manager.write( createReturned ).toBlocking().lastOrDefault( null ); loadObservable = manager.load( createReturned.getId() ); - loadReturned = loadObservable.toBlocking().lastOrDefault(null); + loadReturned = loadObservable.toBlocking().lastOrDefault( null ); assertEquals( "Same value", createReturned, loadReturned ); - assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter")); + assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) ); } @@ -254,30 +248,30 @@ public class EntityCollectionManagerIT { public void writeAndLoadScopeClosure() { - ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization")); + ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); - EntityCollectionManager manager = factory.createCollectionManager(collectionScope1); + EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 ); - Observable<Entity> observable = manager.write(newEntity); + Observable<Entity> observable = manager.write( newEntity ); Entity createReturned = observable.toBlocking().lastOrDefault( null ); - assertNotNull("Id was assigned", createReturned.getId()); - assertNotNull("Version was assigned", createReturned.getVersion()); + assertNotNull( "Id was assigned", createReturned.getId() ); + assertNotNull( "Version was assigned", createReturned.getVersion() ); Observable<Entity> loadObservable = manager.load( createReturned.getId() ); Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null ); - assertEquals("Same value", createReturned, loadReturned); + assertEquals( "Same value", createReturned, loadReturned ); - ApplicationScope collectionScope2 = new ApplicationScopeImpl(new SimpleId("organization")); + ApplicationScope collectionScope2 = new ApplicationScopeImpl( new SimpleId( "organization" ) ); //now make sure we can't load it from another scope, using the same org @@ -286,9 +280,7 @@ public class EntityCollectionManagerIT { Entity loaded = manager2.load( createReturned.getId() ).toBlocking().lastOrDefault( null ); - assertNull("CollectionScope works correctly", loaded); - - + assertNull( "CollectionScope works correctly", loaded ); } @@ -296,34 +288,32 @@ public class EntityCollectionManagerIT { public void writeAndGetField() { - ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization")); + ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); Field field = new StringField( "testField", "unique", true ); - newEntity.setField(field); + newEntity.setField( field ); EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 ); - Observable<Entity> observable = manager.write(newEntity); + Observable<Entity> observable = manager.write( newEntity ); Entity createReturned = observable.toBlocking().lastOrDefault( null ); assertNotNull( "Id was assigned", createReturned.getId() ); - assertNotNull("Version was assigned", createReturned.getVersion()); + assertNotNull( "Version was assigned", createReturned.getVersion() ); Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null ); assertNotNull( id ); assertEquals( newEntity.getId(), id ); Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true ); - id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null ); + id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null ); assertNull( id ); } - - @Test public void updateVersioning() { @@ -331,10 +321,10 @@ public class EntityCollectionManagerIT { Entity origEntity = new Entity( new SimpleId( "testUpdate" ) ); origEntity.setField( new StringField( "testField", "value" ) ); - ApplicationScope context = new ApplicationScopeImpl(new SimpleId("organization")); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); EntityCollectionManager manager = factory.createCollectionManager( context ); - Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault(null); + Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault( null ); // note its version UUID oldVersion = returned.getVersion(); @@ -345,7 +335,7 @@ public class EntityCollectionManagerIT { // partial update entity but we don't have version number Entity updateEntity = new Entity( origEntity.getId() ); updateEntity.setField( new StringField( "addedField", "other value" ) ); - manager.write( updateEntity ).toBlocking().lastOrDefault(null); + manager.write( updateEntity ).toBlocking().lastOrDefault( null ); // get entity now, it must have a new version returned = manager.load( origEntity.getId() ).toBlocking().lastOrDefault( null ); @@ -354,14 +344,14 @@ public class EntityCollectionManagerIT { assertNotNull( "A new version must be assigned", newVersion ); // new Version should be > old version - assertTrue(UUIDComparator.staticCompare(newVersion, oldVersion) > 0); + assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 ); } @Test public void writeMultiget() { - final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); final int multigetSize = serializationFig.getMaxLoadSize(); @@ -381,10 +371,10 @@ public class EntityCollectionManagerIT { final EntitySet entitySet = manager.load( entityIds ).toBlocking().lastOrDefault( null ); - assertNotNull(entitySet); + assertNotNull( entitySet ); assertEquals( multigetSize, entitySet.size() ); - assertFalse(entitySet.isEmpty()); + assertFalse( entitySet.isEmpty() ); /** * Validate every element exists @@ -405,7 +395,7 @@ public class EntityCollectionManagerIT { @Test public void writeMultigetRepair() { - final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); final int multigetSize = serializationFig.getMaxLoadSize(); @@ -444,7 +434,7 @@ public class EntityCollectionManagerIT { assertEquals( "Same entity returned", expected, returned.getEntity().get() ); - assertTrue((Boolean) returned.getEntity().get().getField("updated").getValue()); + assertTrue( ( Boolean ) returned.getEntity().get().getField( "updated" ).getValue() ); } } @@ -452,7 +442,7 @@ public class EntityCollectionManagerIT { @Test( expected = IllegalArgumentException.class ) public void readTooLarge() { - final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); final int multigetSize = serializationFig.getMaxLoadSize() + 1; @@ -474,7 +464,7 @@ public class EntityCollectionManagerIT { @Test public void testGetVersion() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -495,7 +485,7 @@ public class EntityCollectionManagerIT { VersionSet results = - manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last(); + manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last(); final MvccLogEntry version1Log = results.getMaxVersion( created1.getId() ); @@ -515,7 +505,7 @@ public class EntityCollectionManagerIT { @Test public void testVersionLogWrite() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -529,10 +519,10 @@ public class EntityCollectionManagerIT { final UUID v1Version = v1Created.getVersion(); - final VersionSet resultsV1 = manager.getLatestVersion(Arrays.asList(v1Created.getId())).toBlocking().last(); + final VersionSet resultsV1 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last(); - final MvccLogEntry version1Log = resultsV1.getMaxVersion(v1Created.getId()); + final MvccLogEntry version1Log = resultsV1.getMaxVersion( v1Created.getId() ); assertEquals( v1Created.getId(), version1Log.getEntityId() ); assertEquals( v1Version, version1Log.getVersion() ); assertEquals( MvccLogEntry.State.COMPLETE, version1Log.getState() ); @@ -543,7 +533,7 @@ public class EntityCollectionManagerIT { final UUID v2Version = v2Created.getVersion(); - assertTrue("Newer version in v2", UUIDComparator.staticCompare(v2Version, v1Version) > 0); + assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 ); final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last(); @@ -560,7 +550,7 @@ public class EntityCollectionManagerIT { @Test public void testVersionLogUpdate() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -608,7 +598,7 @@ public class EntityCollectionManagerIT { @Test public void healthTest() { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -632,7 +622,7 @@ public class EntityCollectionManagerIT { final Entity entity = EntityHelper.generateEntity( setSize ); //now we have one massive, entity, save it and retrieve it. - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); final EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -652,20 +642,21 @@ public class EntityCollectionManagerIT { SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", currentMaxSize + "" ); } + @Test public void invalidNameRepair() throws ConnectionException { //write an entity with a unique field - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); Entity newEntity = new Entity( new SimpleId( "test" ) ); //if we add a second field we get a second entity that is the exact same. Is this expected? - final IntegerField expectedInteger = new IntegerField( "count", 5, true ); - // final StringField expectedString = new StringField( "yes", "fred", true ); + final IntegerField expectedInteger = new IntegerField( "count", 5, true ); + // final StringField expectedString = new StringField( "yes", "fred", true ); newEntity.setField( expectedInteger ); - // newEntity.setField( expectedString ); + // newEntity.setField( expectedString ); EntityCollectionManager manager = factory.createCollectionManager( context ); @@ -677,23 +668,26 @@ public class EntityCollectionManagerIT { assertNotNull( "Id was assigned", createReturned.getId() ); assertNotNull( "Version was assigned", createReturned.getVersion() ); - FieldSet - fieldResults = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last(); + FieldSet fieldResults = + manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ) + .toBlocking().last(); - assertEquals(1,fieldResults.size()); + assertEquals( 1, fieldResults.size() ); //verify the entity is correct. - assertEquals( "Same value", createReturned, fieldResults.getEntity( expectedInteger ).getEntity().get()); //loadReturned ); + assertEquals( "Same value", createReturned, + fieldResults.getEntity( expectedInteger ).getEntity().get() ); //loadReturned ); //use the entity serializationStrategy to remove the entity data. //do a mark as one test, and a delete as another - entitySerializationStrategy.delete( context,createReturned.getId(),createReturned.getVersion() ).execute(); + entitySerializationStrategy.delete( context, createReturned.getId(), createReturned.getVersion() ).execute(); //try to load via the unique field, should have triggered repair - final FieldSet - results = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last(); + final FieldSet results = + manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ) + .toBlocking().last(); //verify no entity returned @@ -701,37 +695,104 @@ public class EntityCollectionManagerIT { //user the unique serialization to verify it's been deleted from cassandra - UniqueValueSet uniqueValues = uniqueValueSerializationStrategy.load( context, newEntity.getId().getType(), createReturned.getFields() ); + UniqueValueSet uniqueValues = + uniqueValueSerializationStrategy.load( context, newEntity.getId().getType(), createReturned.getFields() ); assertFalse( uniqueValues.iterator().hasNext() ); - } @Test public void testGetIdField() throws Exception { - ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); EntityCollectionManager manager = factory.createCollectionManager( context ); // create an entity of type "item" with a unique_id field value = 1 Entity entity1 = new Entity( new SimpleId( "item" ) ); - entity1.setField( new StringField( "unique_id", "1", true )); + entity1.setField( new StringField( "unique_id", "1", true ) ); manager.write( entity1 ).toBlocking().last(); - final Observable<Id> idObs = manager.getIdField("item", new StringField("unique_id", "1")); - Id id = idObs.toBlocking().lastOrDefault(null); - assertEquals(entity1.getId(), id); + final Observable<Id> idObs = manager.getIdField( "item", new StringField( "unique_id", "1" ) ); + Id id = idObs.toBlocking().lastOrDefault( null ); + assertEquals( entity1.getId(), id ); // create an entity of type "deleted_item" with a unique_id field value = 1 Entity entity2 = new Entity( new SimpleId( "deleted_item" ) ); - entity2.setField( new StringField( "unique_id", "1", true )); + entity2.setField( new StringField( "unique_id", "1", true ) ); manager = factory.createCollectionManager( context ); manager.write( entity2 ).toBlocking().last(); - final Observable<Id> id2Obs = manager.getIdField("deleted_item", new StringField("unique_id", "1")); - Id id2 = id2Obs.toBlocking().lastOrDefault(null); - assertEquals(entity2.getId(), id2); + final Observable<Id> id2Obs = manager.getIdField( "deleted_item", new StringField( "unique_id", "1" ) ); + Id id2 = id2Obs.toBlocking().lastOrDefault( null ); + assertEquals( entity2.getId(), id2 ); + } + + + @Test + public void writeGetVersionsDelete() { + + ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) ); + + Entity entity = new Entity( new SimpleId( "test" ) ); + entity.setField( new IntegerField( "counter", 0 ) ); + + EntityCollectionManager manager = factory.createCollectionManager( context ); + + Entity createReturned = manager.write( entity ).toBlocking().lastOrDefault( null ); + + assertNotNull( "Id was assigned", createReturned.getId() ); + + final int size = 200; + + final Id entityId = createReturned.getId(); + + List<UUID> versions = new ArrayList<>( size ); + versions.add( entity.getVersion() ); + + //write new versions + for ( int i = 1; i < size; i++ ) { + final Entity newEntity = new Entity( entityId ); + + final Entity returnedEntity = manager.write( newEntity ).toBlocking().last(); + + versions.add( returnedEntity.getVersion() ); + } + + + //now get our values, and load the latest version + + final Entity lastVersion = manager.load( entityId ).toBlocking().last(); + + //ensure the latest version is correct + assertEquals( versions.get( versions.size() - 1 ), lastVersion.getVersion() ); + + + // now ensure all versions are correct + final List<MvccLogEntry> entries = manager.getVersions( entityId ).toList().toBlocking().last(); + + + assertEquals( "Same size expected", versions.size(), entries.size() ); + + for ( int i = 0; i < versions.size(); i++ ) { + assertEquals( versions.get( i ), entries.get( i ).getVersion() ); + } + + + //now get all the log versions, and delete them all we do it in 2+ batches to ensure we clean up as expected + manager.getVersions( entityId ).buffer( 100 ).flatMap( bufferList -> manager.delete( bufferList ) ) + .toBlocking().last(); + + + //now load them, there shouldn't be any versions + final List<MvccLogEntry> postDeleteEntries = manager.getVersions( entityId ).toList().toBlocking().last(); + + assertEquals( "All log entries should be removed", 0, postDeleteEntries.size() ); + + final Entity postDeleteLastVersion = manager.load( entityId ).toBlocking().lastOrDefault( null ); + + //ensure the latest version is correct + assertNull( "Last version was deleted", postDeleteLastVersion ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java index c82e1bf..a7210a5 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java @@ -55,7 +55,7 @@ public class MinMaxLogEntryIteratorTest { //now iterate we should get everything MinMaxLogEntryIterator - itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize ); + itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, pageSize ); assertFalse( itr.hasNext() ); @@ -111,12 +111,9 @@ public class MinMaxLogEntryIteratorTest { Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator(); - //this element should be skipped - UUID start = expectedEntries.next().getVersion(); - //now iterate we should get everything MinMaxLogEntryIterator - itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize ); + itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, pageSize ); while ( expectedEntries.hasNext() && itr.hasNext() ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java index 673903c..9ed254c 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java @@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection.serialization.impl; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -223,9 +224,9 @@ public abstract class MvccLogEntrySerializationStrategyImplTest { //now do a range scan from the end - final int half = count/2; + final int half = count / 2; - final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half); + final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half ); assertEquals( half, results.size() ); @@ -238,10 +239,9 @@ public abstract class MvccLogEntrySerializationStrategyImplTest { //now get the next batch - final List<MvccLogEntry> results2 = - logEntryStrategy.loadReversed( context, id, versions[half], count ); + final List<MvccLogEntry> results2 = logEntryStrategy.loadReversed( context, id, versions[half], count ); - assertEquals( half, results2.size()); + assertEquals( half, results2.size() ); for ( int i = 0; i < half; i++ ) { final MvccLogEntry saved = entries[half + i]; @@ -256,12 +256,59 @@ public abstract class MvccLogEntrySerializationStrategyImplTest { logEntryStrategy.delete( context, id, versions[i] ).execute(); } - final List<MvccLogEntry> results3 = logEntryStrategy.loadReversed( context, id, null, versions.length ); + final List<MvccLogEntry> results3 = logEntryStrategy.loadReversed( context, id, null, versions.length ); assertEquals( "All log entries were deleted", 0, results3.size() ); } + @Test + public void createAndDeleteEntries() throws ConnectionException { + + final Id applicationId = new SimpleId( "application" ); + + ApplicationScope context = new ApplicationScopeImpl( applicationId ); + + + final Id id = new SimpleId( "test" ); + + + final int size = 10; + + final List<MvccLogEntry> savedEntries = new ArrayList<>( size ); + + for ( int i = 0; i < size; i++ ) { + final UUID version = UUIDGenerator.newTimeUUID(); + MvccLogEntry saved = new MvccLogEntryImpl( id, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE ); + logEntryStrategy.write( context, saved ).execute(); + + savedEntries.add( saved ); + } + + //now test we get them all back + + final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, null, size ); + + assertEquals( size, results.size() ); + + //assert they're the same + for ( int i = 0; i < size; i++ ) { + assertEquals( savedEntries.get( i ), results.get( i ) ); + } + + //now delete them all + + for ( final MvccLogEntry mvccLogEntry : savedEntries ) { + logEntryStrategy.delete( context, id, mvccLogEntry.getVersion() ).execute(); + } + + //now get them back, should be empty + final List<MvccLogEntry> emptyResults = logEntryStrategy.loadReversed( context, id, null, size ); + + assertEquals( 0, emptyResults.size() ); + } + + @Test( expected = NullPointerException.class ) public void writeParamsNoContext() throws ConnectionException { logEntryStrategy.write( null, mock( MvccLogEntry.class ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java index 93de9d4..cd6ad3d 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java @@ -20,15 +20,11 @@ package org.apache.usergrid.persistence.collection.util;/* import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.TreeMap; import java.util.UUID; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.mvcc.entity.Stage; import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl; @@ -50,7 +46,11 @@ import static org.mockito.Mockito.when; public class LogEntryMock { - private final TreeMap<UUID, MvccLogEntry> entries = new TreeMap<>(ReversedUUIDComparator.INSTANCE); + private final TreeMap<UUID, MvccLogEntry> reversedEntries = + new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) * -1 ); + + private final TreeMap<UUID, MvccLogEntry> entries = + new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) ); private final Id entityId; @@ -61,78 +61,92 @@ public class LogEntryMock { * @param entityId The entity Id to use * @param versions The versions to use */ - private LogEntryMock(final Id entityId, final List<UUID> versions ) { + private LogEntryMock( final Id entityId, final List<UUID> versions ) { this.entityId = entityId; - for ( UUID version: versions) { - entries.put( version, new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ) ); + for ( UUID version : versions ) { + final MvccLogEntry mvccLogEntry = + new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ); + reversedEntries.put( version, mvccLogEntry ); + entries.put( version, mvccLogEntry ); } } /** * Init the mock with the given data structure + * * @param logEntrySerializationStrategy The strategy to moc - * @param scope - * @throws ConnectionException */ - private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope ) + private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, + final ApplicationScope scope ) - throws ConnectionException { + throws ConnectionException { //wire up the mocks - when(logEntrySerializationStrategy.load( same( scope ), same( entityId ), any(UUID.class), any(Integer.class) )).thenAnswer( new Answer<List<MvccLogEntry>>() { - + when( logEntrySerializationStrategy + .load( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer( - @Override - public List<MvccLogEntry> answer( final InvocationOnMock invocation ) throws Throwable { + invocation -> { final UUID startVersion = ( UUID ) invocation.getArguments()[2]; - final int count = (Integer)invocation.getArguments()[3]; + final int count = ( Integer ) invocation.getArguments()[3]; final List<MvccLogEntry> results = new ArrayList<>( count ); - final Iterator<MvccLogEntry> itr = entries.tailMap( startVersion, true ).values().iterator(); + final Iterator<MvccLogEntry> itr = reversedEntries.tailMap( startVersion, true ).values().iterator(); - for(int i = 0; i < count && itr.hasNext(); i ++){ + for ( int i = 0; i < count && itr.hasNext(); i++ ) { results.add( itr.next() ); } return results; - } - } ); - } + } ); - /** - * Get the entry at the specified index from high to low - * @param index - * @return - */ - public MvccLogEntry getEntryAtIndex(final int index){ + //mock in reverse - final Iterator<MvccLogEntry> itr = entries.values().iterator(); + when( logEntrySerializationStrategy + .loadReversed( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer( - for(int i = 0; i < index; i ++){ - itr.next(); - } + invocation -> { + final UUID startVersion = ( UUID ) invocation.getArguments()[2]; + final int count = ( Integer ) invocation.getArguments()[3]; + + + final List<MvccLogEntry> results = new ArrayList<>( count ); + + final Iterator<MvccLogEntry> itr; + + if ( startVersion == null ) { + itr = entries.values().iterator(); + } + else { + itr = entries.tailMap( startVersion, true ).values().iterator(); + } + + for ( int i = 0; i < count && itr.hasNext(); i++ ) { + results.add( itr.next() ); + } - return itr.next(); + + return results; + } ); } /** - * * @param logEntrySerializationStrategy The mock to use * @param scope The scope to use * @param entityId The entityId to use * @param versions The versions to mock - * @throws ConnectionException */ - public static LogEntryMock createLogEntryMock(final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope,final Id entityId, final List<UUID> versions ) + public static LogEntryMock createLogEntryMock( + final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope, + final Id entityId, final List<UUID> versions ) - throws ConnectionException { + throws ConnectionException { LogEntryMock mock = new LogEntryMock( entityId, versions ); mock.initMock( logEntrySerializationStrategy, scope ); @@ -141,19 +155,12 @@ public class LogEntryMock { } - public Collection<MvccLogEntry> getEntries() { - return entries.values(); + public Collection<MvccLogEntry> getReversedEntries() { + return reversedEntries.values(); } - private static final class ReversedUUIDComparator implements Comparator<UUID> { - - public static final ReversedUUIDComparator INSTANCE = new ReversedUUIDComparator(); - - - @Override - public int compare( final UUID o1, final UUID o2 ) { - return UUIDComparator.staticCompare( o1, o2 ) * -1; - } + public Collection<MvccLogEntry> getEntries() { + return entries.values(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/resources/log4j.properties b/stack/corepersistence/collection/src/test/resources/log4j.properties index acf5c39..7b55cf8 100644 --- a/stack/corepersistence/collection/src/test/resources/log4j.properties +++ b/stack/corepersistence/collection/src/test/resources/log4j.properties @@ -33,4 +33,5 @@ log4j.logger.cassandra.db=ERROR #log4j.logger.org.apache.usergrid=DEBUG #log4j.logger.org.apache.usergrid.persistence.collection=TRACE +log4j.logger.org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact=TRACE http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java index 894e74a..2a153e2 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java @@ -31,46 +31,46 @@ import org.safehaus.guicyfig.Key; public interface GraphFig extends GuicyFig { - public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size"; + String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size"; - public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size"; + String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size"; /** * The size of the shards. This is approximate, and should be set lower than what you would like your max to be */ - public static final String SHARD_SIZE = "usergrid.graph.shard.size"; + String SHARD_SIZE = "usergrid.graph.shard.size"; /** * Number of shards we can cache. */ - public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size"; + String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size"; /** * Get the cache timeout. The local cache will exist for this amount of time max (in millis). */ - public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout"; + String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout"; /** * Number of worker threads to refresh the cache */ - public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count"; + String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count"; /** * The size of the worker count for shard auditing */ - public static final String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size"; + String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size"; /** * The size of the worker count for shard auditing */ - public static final String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count"; + String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count"; - public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance"; + String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance"; /** @@ -80,14 +80,14 @@ public interface GraphFig extends GuicyFig { * Note that you should also pad this for node clock drift. A good value for this would be 2x the shard cache * timeout + 30 seconds, assuming you have NTP and allow a max drift of 30 seconds */ - public static final String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta"; + String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta"; - public static final String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count"; + String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count"; - public static final String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval"; + String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval"; - public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size"; + String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
