Removed duplicate MigrationRule Partially refactored entity serialization.
Finished creating tests for refactoring Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2bd1c950 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2bd1c950 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2bd1c950 Branch: refs/heads/USERGRID-250-buffer-size-fix Commit: 2bd1c950f6fab093d458e6c3a55fb7fb8ff28e79 Parents: 39b1576 Author: Todd Nine <[email protected]> Authored: Wed Nov 19 17:01:04 2014 -0700 Committer: Todd Nine <[email protected]> Committed: Wed Nov 19 17:01:04 2014 -0700 ---------------------------------------------------------------------- .../corepersistence/migration/Versions.java | 3 + .../impl/EntityCollectionManagerImpl.java | 241 +++++++++---------- .../mvcc/MvccEntitySerializationStrategy.java | 3 +- .../mvcc/MvccLogEntrySerializationStrategy.java | 3 +- .../mvcc/stage/delete/MarkCommit.java | 3 +- .../mvcc/stage/write/WriteCommit.java | 3 +- .../UniqueValueSerializationStrategy.java | 3 +- .../MvccEntitySerializationStrategyImpl.java | 172 ++----------- ...vccEntitySerializationStrategyProxyImpl.java | 162 +++++++++++++ .../MvccEntitySerializationStrategyV1Impl.java | 193 +++++++++++++++ .../MvccEntitySerializationStrategyV2Impl.java | 194 +++++++++++++++ .../MvccLogEntrySerializationStrategyImpl.java | 2 +- .../serialization/impl/SerializationModule.java | 21 +- .../UniqueValueSerializationStrategyImpl.java | 2 +- .../EntityCollectionManagerFactoryTest.java | 2 +- .../collection/EntityCollectionManagerIT.java | 74 ++++-- .../EntityCollectionManagerStressTest.java | 2 +- .../EntityCollectionManagerSyncIT.java | 2 +- .../collection/guice/MigrationManagerRule.java | 38 --- .../collection/guice/TestCollectionModule.java | 14 ++ ...niqueValueSerializationStrategyImplTest.java | 2 +- .../mvcc/stage/write/WriteUniqueVerifyIT.java | 2 +- .../mvcc/stage/write/WriteUniqueVerifyTest.java | 2 +- ...MvccEntitySerializationStrategyImplTest.java | 56 ++--- ...cEntitySerializationStrategyProxyV1Test.java | 77 ++++++ ...cEntitySerializationStrategyProxyV2Test.java | 75 ++++++ ...ccEntitySerializationStrategyV1ImplTest.java | 46 ++++ ...ccEntitySerializationStrategyV2ImplTest.java | 47 ++++ .../impl/MvccLESSTransientTest.java | 2 +- ...ccLogEntrySerializationStrategyImplTest.java | 2 +- .../src/test/resources/log4j.properties | 21 +- .../core/guice/MaxMigrationModule.java | 39 +++ .../core/guice/MaxMigrationVersion.java | 40 +++ .../core/guice/MigrationManagerRule.java | 19 +- .../persistence/graph/GraphManagerIT.java | 2 +- .../persistence/graph/GraphManagerLoadTest.java | 2 +- .../graph/GraphManagerShardingIT.java | 2 +- .../graph/GraphManagerStressTest.java | 2 +- .../usergrid/persistence/graph/SimpleTest.java | 2 +- .../graph/guice/TestGraphModule.java | 11 + .../graph/impl/EdgeDeleteListenerTest.java | 2 +- .../graph/impl/NodeDeleteListenerTest.java | 2 +- .../graph/impl/stage/EdgeDeleteRepairTest.java | 2 +- .../graph/impl/stage/EdgeMetaRepairTest.java | 2 +- .../EdgeMetadataSerializationTest.java | 2 +- .../EdgeSerializationChopTest.java | 2 +- .../serialization/EdgeSerializationTest.java | 2 +- .../serialization/NodeSerializationTest.java | 2 +- .../impl/shard/EdgeShardSerializationTest.java | 2 +- .../NodeShardCounterSerializationTest.java | 2 +- .../persistence/map/MapManagerTest.java | 2 +- 51 files changed, 1205 insertions(+), 405 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java index b4fe095..99067b7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java @@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.migration; +import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyImpl; import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl; @@ -39,4 +40,6 @@ public class Versions { * Version 2. Edge meta changes */ public static final int VERSION_2 = EdgeMetadataSerializationProxyImpl.MIGRATION_VERSION; + + public static final int VERSION_3 = MvccEntitySerializationStrategyProxyImpl.MIGRATION_VERSION; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index 919e83b..d54c5f7 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -23,23 +23,18 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import org.apache.usergrid.persistence.collection.serialization.UniqueValue; -import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; -import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; -import org.apache.usergrid.persistence.model.field.Field; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.VersionSet; import org.apache.usergrid.persistence.collection.guice.Write; import org.apache.usergrid.persistence.collection.guice.WriteUpdate; import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy; -import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils; import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit; @@ -49,9 +44,16 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart; import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.collection.serialization.UniqueValue; +import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; +import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; +import org.apache.usergrid.persistence.core.guice.ProxyImpl; +import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import com.google.common.base.Preconditions; @@ -59,11 +61,10 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.connectionpool.OperationResult; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.model.CqlResult; import com.netflix.astyanax.serializers.StringSerializer; -import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.core.util.Health; import rx.Observable; import rx.Subscriber; @@ -73,13 +74,12 @@ import rx.schedulers.Schedulers; /** - * Simple implementation. Should perform writes, delete and load. - * <p/> - * TODO: maybe refactor the stage operations into their own classes for clarity and organization? + * Simple implementation. Should perform writes, delete and load. <p/> TODO: maybe refactor the stage operations into + * their own classes for clarity and organization? */ public class EntityCollectionManagerImpl implements EntityCollectionManager { - private static final Logger logger = LoggerFactory.getLogger(EntityCollectionManagerImpl.class); + private static final Logger logger = LoggerFactory.getLogger( EntityCollectionManagerImpl.class ); private final CollectionScope collectionScope; @@ -106,24 +106,20 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Inject - public EntityCollectionManagerImpl( - @Write final WriteStart writeStart, - @WriteUpdate final WriteStart writeUpdate, - final WriteUniqueVerify writeVerifyUnique, - final WriteOptimisticVerify writeOptimisticVerify, - final WriteCommit writeCommit, final RollbackAction rollback, - final MarkStart markStart, final MarkCommit markCommit, - final MvccEntitySerializationStrategy entitySerializationStrategy, - final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, - final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, - final SerializationFig config, - @Assisted final CollectionScope collectionScope - ) { + public EntityCollectionManagerImpl( @Write final WriteStart writeStart, @WriteUpdate final WriteStart writeUpdate, + final WriteUniqueVerify writeVerifyUnique, + final WriteOptimisticVerify writeOptimisticVerify, + final WriteCommit writeCommit, final RollbackAction rollback, + final MarkStart markStart, final MarkCommit markCommit, + @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy, + final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, + final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, + final Keyspace keyspace, final SerializationFig config, + @Assisted final CollectionScope collectionScope ) { this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.entitySerializationStrategy = entitySerializationStrategy; - MvccValidationUtils.validateCollectionScope(collectionScope); + MvccValidationUtils.validateCollectionScope( collectionScope ); this.writeStart = writeStart; this.writeUpdate = writeUpdate; @@ -145,20 +141,20 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Override - public Observable<Entity> write(final Entity entity) { + public Observable<Entity> write( final Entity entity ) { //do our input validation - Preconditions.checkNotNull(entity, "Entity is required in the new stage of the mvcc write"); + Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" ); final Id entityId = entity.getId(); - ValidationUtils.verifyIdentity(entityId); + ValidationUtils.verifyIdentity( entityId ); // create our observable and start the write - final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(collectionScope, entity); + final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity ); - Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner(writeData, writeStart); + Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart ); // execute all validation stages concurrently. Needs refactored when this is done. // https://github.com/Netflix/RxJava/issues/627 @@ -166,115 +162,119 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { // writeVerifyUnique, writeOptimisticVerify ); // return the commit result. - return observable.map(writeCommit).doOnError(rollback); + return observable.map( writeCommit ).doOnError( rollback ); } @Override - public Observable<Void> delete(final Id entityId) { - - Preconditions.checkNotNull(entityId, "Entity id is required in this stage"); - Preconditions.checkNotNull(entityId.getUuid(), "Entity id is required in this stage"); - Preconditions.checkNotNull(entityId.getType(), "Entity type is required in this stage"); - - return Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId)).map(markStart) - .doOnNext(markCommit).map(new Func1<CollectionIoEvent<MvccEntity>, Void>() { - @Override - public Void call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) { - return null; - } - }); + public Observable<Void> delete( final Id entityId ) { + + Preconditions.checkNotNull( entityId, "Entity id is required in this stage" ); + Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" ); + Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" ); + + return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) ).map( markStart ) + .doOnNext( markCommit ).map( new Func1<CollectionIoEvent<MvccEntity>, Void>() { + @Override + public Void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) { + return null; + } + } ); } @Override - public Observable<Entity> load(final Id entityId) { + public Observable<Entity> load( final Id entityId ) { - Preconditions.checkNotNull(entityId, "Entity id required in the load stage"); - Preconditions.checkNotNull(entityId.getUuid(), "Entity id uuid required in load stage"); - Preconditions.checkNotNull(entityId.getType(), "Entity id type required in load stage"); + Preconditions.checkNotNull( entityId, "Entity id required in the load stage" ); + Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" ); + Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" ); - return load(Collections.singleton(entityId)).map(new Func1<EntitySet, Entity>() { + return load( Collections.singleton( entityId ) ).map( new Func1<EntitySet, Entity>() { @Override - public Entity call(final EntitySet entitySet) { - final MvccEntity entity = entitySet.getEntity(entityId); + public Entity call( final EntitySet entitySet ) { + final MvccEntity entity = entitySet.getEntity( entityId ); - if (entity == null) { + if ( entity == null ) { return null; } return entity.getEntity().orNull(); } - }); + } ); } @Override - public Observable<EntitySet> load(final Collection<Id> entityIds) { + public Observable<EntitySet> load( final Collection<Id> entityIds ) { - Preconditions.checkNotNull(entityIds, "entityIds cannot be null"); + Preconditions.checkNotNull( entityIds, "entityIds cannot be null" ); - return Observable.create(new Observable.OnSubscribe<EntitySet>() { + return Observable.create( new Observable.OnSubscribe<EntitySet>() { @Override - public void call(final Subscriber<? super EntitySet> subscriber) { + public void call( final Subscriber<? super EntitySet> subscriber ) { try { - final EntitySet results = entitySerializationStrategy.load( - collectionScope, entityIds, UUIDGenerator.newTimeUUID()); + final EntitySet results = + entitySerializationStrategy.load( collectionScope, entityIds, UUIDGenerator.newTimeUUID() ); - subscriber.onNext(results); + subscriber.onNext( results ); subscriber.onCompleted(); - } catch (Exception e) { - subscriber.onError(e); + } + catch ( Exception e ) { + subscriber.onError( e ); } } - }); + } ); } + @Override - public Observable<Id> getIdField(final Field field) { - final List<Field> fields = Collections.singletonList(field); - return rx.Observable.from(fields).map(new Func1<Field, Id>() { + public Observable<Id> getIdField( final Field field ) { + final List<Field> fields = Collections.singletonList( field ); + return rx.Observable.from( fields ).map( new Func1<Field, Id>() { @Override - public Id call(Field field) { + public Id call( Field field ) { try { - UniqueValueSet set = uniqueValueSerializationStrategy.load(collectionScope, fields); - UniqueValue value = set.getValue(field.getName()); + UniqueValueSet set = uniqueValueSerializationStrategy.load( collectionScope, fields ); + UniqueValue value = set.getValue( field.getName() ); Id id = value == null ? null : value.getEntityId(); return id; - } catch (ConnectionException e) { - logger.error("Failed to getIdField", e); - throw new RuntimeException(e); + } + catch ( ConnectionException e ) { + logger.error( "Failed to getIdField", e ); + throw new RuntimeException( e ); } } - }); + } ); } + @Override - public Observable<Entity> update(final Entity entity) { + public Observable<Entity> update( final Entity entity ) { - logger.debug("Starting update process"); + logger.debug( "Starting update process" ); //do our input validation - Preconditions.checkNotNull(entity, "Entity is required in the new stage of the mvcc write"); + Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" ); final Id entityId = entity.getId(); - ValidationUtils.verifyIdentity(entityId); + ValidationUtils.verifyIdentity( entityId ); // create our observable and start the write - CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(collectionScope, entity); + CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity ); - Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner(writeData, writeUpdate); + Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate ); - return observable.map(writeCommit).doOnNext(new Action1<Entity>() { + return observable.map( writeCommit ).doOnNext( new Action1<Entity>() { @Override - public void call(final Entity entity) { - logger.debug("sending entity to the queue"); + public void call( final Entity entity ) { + logger.debug( "sending entity to the queue" ); //we an update, signal the fix @@ -284,57 +284,56 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { } - }).doOnError(rollback); + } ).doOnError( rollback ); } // fire the stages - public Observable<CollectionIoEvent<MvccEntity>> stageRunner(CollectionIoEvent<Entity> writeData, - WriteStart writeState) { + public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, + WriteStart writeState ) { - return Observable.from(writeData).map(writeState).doOnNext( - new Action1<CollectionIoEvent<MvccEntity>>() { + return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() { - @Override - public void call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) { + @Override + public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) { - Observable<CollectionIoEvent<MvccEntity>> unique = - Observable.from(mvccEntityCollectionIoEvent).subscribeOn(Schedulers.io()) - .doOnNext(writeVerifyUnique); + Observable<CollectionIoEvent<MvccEntity>> unique = + Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() ) + .doOnNext( writeVerifyUnique ); - // optimistic verification - Observable<CollectionIoEvent<MvccEntity>> optimistic = - Observable.from(mvccEntityCollectionIoEvent).subscribeOn(Schedulers.io()) - .doOnNext(writeOptimisticVerify); + // optimistic verification + Observable<CollectionIoEvent<MvccEntity>> optimistic = + Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() ) + .doOnNext( writeOptimisticVerify ); - //wait for both to finish - Observable.merge(unique, optimistic).toBlocking().last(); - } - }); + //wait for both to finish + Observable.merge( unique, optimistic ).toBlocking().last(); + } + } ); } @Override - public Observable<VersionSet> getLatestVersion(final Collection<Id> entityIds) { + public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) { - return Observable.create(new Observable.OnSubscribe<VersionSet>() { + return Observable.create( new Observable.OnSubscribe<VersionSet>() { @Override - public void call(final Subscriber<? super VersionSet> subscriber) { + public void call( final Subscriber<? super VersionSet> subscriber ) { try { final VersionSet logEntries = mvccLogEntrySerializationStrategy - .load(collectionScope, entityIds, UUIDGenerator.newTimeUUID()); + .load( collectionScope, entityIds, UUIDGenerator.newTimeUUID() ); - subscriber.onNext(logEntries); + subscriber.onNext( logEntries ); subscriber.onCompleted(); - - } catch (Exception e) { - subscriber.onError(e); + } + catch ( Exception e ) { + subscriber.onError( e ); } } - }); + } ); } @@ -342,25 +341,21 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { public Health getHealth() { try { - ColumnFamily<String, String> CF_SYSTEM_LOCAL = new ColumnFamily<String, String>( - "system.local", - StringSerializer.get(), - StringSerializer.get(), - StringSerializer.get()); + ColumnFamily<String, String> CF_SYSTEM_LOCAL = + new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(), + StringSerializer.get() ); - OperationResult<CqlResult<String, String>> result = keyspace.prepareQuery(CF_SYSTEM_LOCAL) - .withCql("SELECT now() FROM system.local;") - .execute(); + OperationResult<CqlResult<String, String>> result = + keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute(); if ( result.getResult().getRows().size() == 1 ) { return Health.GREEN; } - - } catch ( ConnectionException ex ) { - logger.error("Error connecting to Cassandra", ex); + } + catch ( ConnectionException ex ) { + logger.error( "Error connecting to Cassandra", ex ); } return Health.RED; } - } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java index b9277eb..93288af 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.migration.schema.Migration; import org.apache.usergrid.persistence.model.entity.Id; import com.netflix.astyanax.MutationBatch; @@ -34,7 +35,7 @@ import com.netflix.astyanax.MutationBatch; /** * The interface that allows us to serialize an entity to disk */ -public interface MvccEntitySerializationStrategy { +public interface MvccEntitySerializationStrategy extends Migration { /** * Serialize the entity to the data store with the given collection context http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java index 927a60c..4baef84 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.usergrid.persistence.collection.CollectionScope; import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.VersionSet; +import org.apache.usergrid.persistence.core.migration.schema.Migration; import org.apache.usergrid.persistence.model.entity.Id; import com.netflix.astyanax.MutationBatch; @@ -34,7 +35,7 @@ import com.netflix.astyanax.MutationBatch; /** * The interface that allows us to serialize a log entry to disk */ -public interface MvccLogEntrySerializationStrategy { +public interface MvccLogEntrySerializationStrategy extends Migration { /** * Serialize the entity to the data store with the given collection context http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java index 9e2d52c..5bcb9f8 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java @@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.collection.serialization.SerializationFig import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; +import org.apache.usergrid.persistence.core.guice.ProxyImpl; import org.apache.usergrid.persistence.core.rx.ObservableIterator; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -72,7 +73,7 @@ public class MarkCommit implements Action1<CollectionIoEvent<MvccEntity>> { @Inject public MarkCommit( final MvccLogEntrySerializationStrategy logStrat, - final MvccEntitySerializationStrategy entityStrat, + @ProxyImpl final MvccEntitySerializationStrategy entityStrat, final UniqueValueSerializationStrategy uniqueValueStrat, final SerializationFig serializationFig, final Keyspace keyspace ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java index 6a17197..d3c8193 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java @@ -37,6 +37,7 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.util.EntityUtils; +import org.apache.usergrid.persistence.core.guice.ProxyImpl; import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -70,7 +71,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity> @Inject public WriteCommit( final MvccLogEntrySerializationStrategy logStrat, - final MvccEntitySerializationStrategy entryStrat, + @ProxyImpl final MvccEntitySerializationStrategy entryStrat, final UniqueValueSerializationStrategy uniqueValueStrat) { Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java index 4ceb407..030d9d1 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java @@ -23,13 +23,14 @@ import java.util.Collection; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.core.migration.schema.Migration; import org.apache.usergrid.persistence.model.field.Field; /** * Reads and writes to UniqueValues column family. */ -public interface UniqueValueSerializationStrategy { +public interface UniqueValueSerializationStrategy extends Migration { /** * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java index bbaeb4a..1ec027f 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java @@ -18,9 +18,7 @@ package org.apache.usergrid.persistence.collection.serialization.impl; -import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -54,21 +52,15 @@ import org.apache.usergrid.persistence.core.migration.schema.Migration; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.inject.Inject; -import com.google.inject.Singleton; import com.netflix.astyanax.ColumnListMutation; import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import com.netflix.astyanax.model.Column; import com.netflix.astyanax.model.ColumnList; -import com.netflix.astyanax.model.CompositeBuilder; -import com.netflix.astyanax.model.CompositeParser; -import com.netflix.astyanax.model.Composites; import com.netflix.astyanax.model.Row; import com.netflix.astyanax.query.RowQuery; import com.netflix.astyanax.serializers.AbstractSerializer; @@ -80,18 +72,13 @@ import com.netflix.astyanax.serializers.UUIDSerializer; /** * @author tnine */ -@Singleton -public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializationStrategy, Migration { +public abstract class MvccEntitySerializationStrategyImpl implements MvccEntitySerializationStrategy { private static final Logger log = LoggerFactory.getLogger( MvccLogEntrySerializationStrategyImpl.class ); - private static final EntitySerializer ENTITY_JSON_SER = new EntitySerializer(); - private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get(); - - private static final ByteBufferSerializer BUFFER_SERIALIZER = ByteBufferSerializer.get(); - private static final BytesArraySerializer BYTES_ARRAY_SERIALIZER = BytesArraySerializer.get(); + private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get(); private static final CollectionScopedRowKeySerializer<Id> ROW_KEY_SER = @@ -104,6 +91,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat protected final Keyspace keyspace; protected final SerializationFig serializationFig; protected final EntityRepair repair; + private final AbstractSerializer<EntityWrapper> entityJsonSerializer; @Inject @@ -111,6 +99,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat this.keyspace = keyspace; this.serializationFig = serializationFig; this.repair = new EntityRepairImpl( this, serializationFig ); + this.entityJsonSerializer = getEntitySerializer(); } @@ -126,7 +115,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat @Override public void doOp( final ColumnListMutation<UUID> colMutation ) { try { - colMutation.putColumn( colName, ENTITY_JSON_SER + colMutation.putColumn( colName, entityJsonSerializer .toByteBuffer( new EntityWrapper( entity.getStatus(), entity.getEntity() ) ) ); } catch ( Exception e ) { @@ -207,7 +196,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat final Column<UUID> column = columns.getColumnByIndex( 0 ); - final MvccEntity parsedEntity = new MvccColumnParser( entityId ).parseColumn( column ); + final MvccEntity parsedEntity = new MvccColumnParser( entityId, entityJsonSerializer ).parseColumn( column ); //we *might* need to repair, it's not clear so check before loading into result sets final MvccEntity maybeRepaired = repair.maybeRepair( collectionScope, parsedEntity ); @@ -245,7 +234,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat keyspace.prepareQuery( CF_ENTITY_DATA ).getKey( rowKey ) .withColumnRange( version, null, false, fetchSize ); - return new ColumnNameIterator( query, new MvccColumnParser( entityId ), false ); + return new ColumnNameIterator( query, new MvccColumnParser( entityId, entityJsonSerializer ), false ); } @@ -275,7 +264,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat keyspace.prepareQuery( CF_ENTITY_DATA ).getKey( rowKey ) .withColumnRange( null, version, true, fetchSize ); - return new ColumnNameIterator( query, new MvccColumnParser( entityId ), false ); + return new ColumnNameIterator( query, new MvccColumnParser( entityId, entityJsonSerializer ), false ); } @@ -290,7 +279,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat return doWrite( collectionScope, entityId, new RowOp() { @Override public void doOp( final ColumnListMutation<UUID> colMutation ) { - colMutation.putColumn( version, ENTITY_JSON_SER + colMutation.putColumn( version, entityJsonSerializer .toByteBuffer( new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() ) ) ); } } ); @@ -367,12 +356,12 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat /** * Simple bean wrapper for state and entity */ - private static class EntityWrapper { - private final MvccEntity.Status status; - private final Optional<Entity> entity; + protected static class EntityWrapper { + protected final MvccEntity.Status status; + protected final Optional<Entity> entity; - private EntityWrapper( final MvccEntity.Status status, final Optional<Entity> entity ) { + protected EntityWrapper( final MvccEntity.Status status, final Optional<Entity> entity ) { this.status = status; this.entity = entity; } @@ -385,10 +374,12 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat private static final class MvccColumnParser implements ColumnParser<UUID, MvccEntity> { private final Id id; + private final AbstractSerializer<EntityWrapper> entityJsonSerializer; - private MvccColumnParser( Id id ) { + private MvccColumnParser( Id id, final AbstractSerializer<EntityWrapper> entityJsonSerializer ) { this.id = id; + this.entityJsonSerializer = entityJsonSerializer; } @@ -399,7 +390,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat final UUID version = column.getName(); try { - deSerialized = column.getValue( ENTITY_JSON_SER ); + deSerialized = column.getValue( entityJsonSerializer ); } catch ( DataCorruptionException e ) { log.error( @@ -421,128 +412,9 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat } - public static class EntitySerializer extends AbstractSerializer<EntityWrapper> { - - - public static final SmileFactory f = new SmileFactory(); - - public static ObjectMapper mapper; - - private static byte[] STATE_COMPLETE = new byte[] { 0 }; - private static byte[] STATE_DELETED = new byte[] { 1 }; - private static byte[] STATE_PARTIAL = new byte[] { 2 }; - - private static byte[] VERSION = new byte[] { 0 }; - - - - public EntitySerializer() { - try { - mapper = new ObjectMapper( f ); - // mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output, - // causes slowness - mapper.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" ); - } - catch ( Exception e ) { - throw new RuntimeException( "Error setting up mapper", e ); - } - } - - - @Override - public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) { - if ( wrapper == null ) { - return null; - } - - CompositeBuilder builder = Composites.newCompositeBuilder(); - - builder.addBytes( VERSION ); - - //mark this version as empty - if ( !wrapper.entity.isPresent() ) { - //we're empty - builder.addBytes( STATE_DELETED ); - - return builder.build(); - } - - //we have an entity - - if ( wrapper.status == MvccEntity.Status.COMPLETE ) { - builder.addBytes( STATE_COMPLETE ); - } - - else { - builder.addBytes( STATE_PARTIAL ); - } - - try { - final byte[] entityBytes = mapper.writeValueAsBytes( wrapper.entity.get() ) ; - builder.addBytes( entityBytes ); - } - catch ( Exception e ) { - throw new RuntimeException( "Unable to serialize entity", e ); - } - - return builder.build(); - } - - - @Override - public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) { - - /** - * We intentionally turn data corruption exceptions when we're unable to de-serialize - * the data in cassandra. If this occurs, we'll never be able to de-serialize it - * and it should be considered lost. This is an error that is occuring due to a bug - * in serializing the entity. This is a lazy recognition + repair signal for deployment with - * existing systems. - */ - CompositeParser parser; - try { - parser = Composites.newCompositeParser( byteBuffer ); - } - catch ( Exception e ) { - throw new DataCorruptionException("Unable to de-serialze entity", e); - } - - byte[] version = parser.read( BYTES_ARRAY_SERIALIZER ); - - if ( !Arrays.equals( VERSION, version ) ) { - throw new UnsupportedOperationException( "A version of type " + version + " is unsupported" ); - } - - byte[] state = parser.read( BYTES_ARRAY_SERIALIZER ); - - // it's been deleted, remove it - - if ( Arrays.equals( STATE_DELETED, state ) ) { - return new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() ); - } - - Entity storedEntity; - - ByteBuffer jsonBytes = parser.read( BUFFER_SERIALIZER ); - byte[] array = jsonBytes.array(); - int start = jsonBytes.arrayOffset(); - int length = jsonBytes.remaining(); - - try { - storedEntity = mapper.readValue( array, start, length, Entity.class ); - } - catch ( Exception e ) { - throw new DataCorruptionException( "Unable to read entity data", e ); - } - - final Optional<Entity> entity = Optional.of( storedEntity ); - - if ( Arrays.equals( STATE_COMPLETE, state ) ) { - return new EntityWrapper( MvccEntity.Status.COMPLETE, entity ); - } - - // it's partial by default - return new EntityWrapper( MvccEntity.Status.PARTIAL, entity ); - } - } + /** + * Return the entity serializer for this instance + * @return + */ + protected abstract AbstractSerializer<EntityWrapper> getEntitySerializer(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java new file mode 100644 index 0000000..a9e01b1 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.UUID; + +import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy; +import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition; +import org.apache.usergrid.persistence.core.guice.CurrentImpl; +import org.apache.usergrid.persistence.core.guice.PreviousImpl; +import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.MutationBatch; + + +/** + * Version 3 implementation of entity serialization. This will proxy writes and reads so that during + * migration data goes to both sources and is read from the old source. After the ugprade completes, + * it will be available from the new source + */ +@Singleton +public class MvccEntitySerializationStrategyProxyImpl implements MvccEntitySerializationStrategy { + + + public static final int MIGRATION_VERSION = 3; + + private final DataMigrationManager dataMigrationManager; + private final Keyspace keyspace; + private final MvccEntitySerializationStrategy previous; + private final MvccEntitySerializationStrategy current; + + + @Inject + public MvccEntitySerializationStrategyProxyImpl( final DataMigrationManager dataMigrationManager, + final Keyspace keyspace, + @PreviousImpl final MvccEntitySerializationStrategy previous, + @CurrentImpl final MvccEntitySerializationStrategy current ) { + this.dataMigrationManager = dataMigrationManager; + this.keyspace = keyspace; + this.previous = previous; + this.current = current; + } + + + @Override + public MutationBatch write( final CollectionScope context, final MvccEntity entity ) { + if ( isOldVersion() ) { + final MutationBatch aggregateBatch = keyspace.prepareMutationBatch(); + + aggregateBatch.mergeShallow( previous.write( context, entity ) ); + aggregateBatch.mergeShallow( current.write( context, entity ) ); + + return aggregateBatch; + } + + return current.write( context, entity ); + } + + + @Override + public EntitySet load( final CollectionScope scope, final Collection<Id> entityIds, final UUID maxVersion ) { + if ( isOldVersion() ) { + return previous.load( scope, entityIds, maxVersion ); + } + + return current.load( scope, entityIds, maxVersion ); + } + + + @Override + public Iterator<MvccEntity> load( final CollectionScope context, final Id entityId, final UUID version, + final int fetchSize ) { + if ( isOldVersion() ) { + return previous.load( context, entityId, version, fetchSize ); + } + + return current.load( context, entityId, version, fetchSize ); + } + + + @Override + public Iterator<MvccEntity> loadHistory( final CollectionScope context, final Id entityId, final UUID version, + final int fetchSize ) { + if ( isOldVersion() ) { + return previous.loadHistory( context, entityId, version, fetchSize ); + } + + return current.loadHistory( context, entityId, version, fetchSize ); + } + + + @Override + public MutationBatch mark( final CollectionScope context, final Id entityId, final UUID version ) { + if ( isOldVersion() ) { + final MutationBatch aggregateBatch = keyspace.prepareMutationBatch(); + + aggregateBatch.mergeShallow( previous.mark( context, entityId, version ) ); + aggregateBatch.mergeShallow( current.mark( context, entityId, version ) ); + + return aggregateBatch; + } + + return current.mark( context, entityId, version ); + } + + + @Override + public MutationBatch delete( final CollectionScope context, final Id entityId, final UUID version ) { + if ( isOldVersion() ) { + final MutationBatch aggregateBatch = keyspace.prepareMutationBatch(); + + aggregateBatch.mergeShallow( previous.delete( context, entityId, version ) ); + aggregateBatch.mergeShallow( current.delete( context, entityId, version ) ); + + return aggregateBatch; + } + + return current.delete( context, entityId, version ); + } + + + /** + * Return true if we're on an old version + */ + private boolean isOldVersion() { + return dataMigrationManager.getCurrentVersion() < MIGRATION_VERSION; + } + + + @Override + public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() { + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java new file mode 100644 index 0000000..49b3486 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.exception.DataCorruptionException; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.model.entity.Entity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.model.CompositeBuilder; +import com.netflix.astyanax.model.CompositeParser; +import com.netflix.astyanax.model.Composites; +import com.netflix.astyanax.serializers.AbstractSerializer; +import com.netflix.astyanax.serializers.ByteBufferSerializer; +import com.netflix.astyanax.serializers.BytesArraySerializer; + + +/** + * Version 1 implementation of entity serialization + */ +@Singleton +public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializationStrategyImpl { + + private static final EntitySerializer ENTITY_JSON_SER = new EntitySerializer(); + + + @Inject + public MvccEntitySerializationStrategyV1Impl( final Keyspace keyspace, final SerializationFig serializationFig ) { + super( keyspace, serializationFig ); + } + + + @Override + protected AbstractSerializer<MvccEntitySerializationStrategyImpl.EntityWrapper> getEntitySerializer() { + return ENTITY_JSON_SER; + } + + + public static class EntitySerializer extends AbstractSerializer<EntityWrapper> { + + + private static final ByteBufferSerializer BUFFER_SERIALIZER = ByteBufferSerializer.get(); + + private static final BytesArraySerializer BYTES_ARRAY_SERIALIZER = BytesArraySerializer.get(); + + + public static final SmileFactory f = new SmileFactory(); + + public static ObjectMapper mapper; + + private static byte[] STATE_COMPLETE = new byte[] { 0 }; + private static byte[] STATE_DELETED = new byte[] { 1 }; + private static byte[] STATE_PARTIAL = new byte[] { 2 }; + + private static byte[] VERSION = new byte[] { 0 }; + + + public EntitySerializer() { + try { + mapper = new ObjectMapper( f ); + // mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output, + // causes slowness + mapper.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" ); + } + catch ( Exception e ) { + throw new RuntimeException( "Error setting up mapper", e ); + } + } + + + @Override + public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) { + if ( wrapper == null ) { + return null; + } + + CompositeBuilder builder = Composites.newCompositeBuilder(); + + builder.addBytes( VERSION ); + + //mark this version as empty + if ( !wrapper.entity.isPresent() ) { + //we're empty + builder.addBytes( STATE_DELETED ); + + return builder.build(); + } + + //we have an entity + + if ( wrapper.status == MvccEntity.Status.COMPLETE ) { + builder.addBytes( STATE_COMPLETE ); + } + + else { + builder.addBytes( STATE_PARTIAL ); + } + + try { + final byte[] entityBytes = mapper.writeValueAsBytes( wrapper.entity.get() ); + builder.addBytes( entityBytes ); + } + catch ( Exception e ) { + throw new RuntimeException( "Unable to serialize entity", e ); + } + + return builder.build(); + } + + + @Override + public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) { + + /** + * We intentionally turn data corruption exceptions when we're unable to de-serialize + * the data in cassandra. If this occurs, we'll never be able to de-serialize it + * and it should be considered lost. This is an error that is occuring due to a bug + * in serializing the entity. This is a lazy recognition + repair signal for deployment with + * existing systems. + */ + CompositeParser parser; + try { + parser = Composites.newCompositeParser( byteBuffer ); + } + catch ( Exception e ) { + throw new DataCorruptionException( "Unable to de-serialze entity", e ); + } + + byte[] version = parser.read( BYTES_ARRAY_SERIALIZER ); + + if ( !Arrays.equals( VERSION, version ) ) { + throw new UnsupportedOperationException( "A version of type " + version + " is unsupported" ); + } + + byte[] state = parser.read( BYTES_ARRAY_SERIALIZER ); + + // it's been deleted, remove it + + if ( Arrays.equals( STATE_DELETED, state ) ) { + return new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() ); + } + + Entity storedEntity; + + ByteBuffer jsonBytes = parser.read( BUFFER_SERIALIZER ); + byte[] array = jsonBytes.array(); + int start = jsonBytes.arrayOffset(); + int length = jsonBytes.remaining(); + + try { + storedEntity = mapper.readValue( array, start, length, Entity.class ); + } + catch ( Exception e ) { + throw new DataCorruptionException( "Unable to read entity data", e ); + } + + final Optional<Entity> entity = Optional.of( storedEntity ); + + if ( Arrays.equals( STATE_COMPLETE, state ) ) { + return new EntityWrapper( MvccEntity.Status.COMPLETE, entity ); + } + + // it's partial by default + return new EntityWrapper( MvccEntity.Status.PARTIAL, entity ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java new file mode 100644 index 0000000..f80960d --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.exception.DataCorruptionException; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.model.entity.Entity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.model.CompositeBuilder; +import com.netflix.astyanax.model.CompositeParser; +import com.netflix.astyanax.model.Composites; +import com.netflix.astyanax.serializers.AbstractSerializer; +import com.netflix.astyanax.serializers.ByteBufferSerializer; +import com.netflix.astyanax.serializers.BytesArraySerializer; + + +/** + * Version 1 implementation of entity serialization + */ +@Singleton +public class MvccEntitySerializationStrategyV2Impl extends MvccEntitySerializationStrategyImpl { + + private static final EntitySerializer ENTITY_JSON_SER = new EntitySerializer(); + + + @Inject + public MvccEntitySerializationStrategyV2Impl( final Keyspace keyspace, final SerializationFig serializationFig ) { + super( keyspace, serializationFig ); + } + + + @Override + protected AbstractSerializer<EntityWrapper> getEntitySerializer() { + return ENTITY_JSON_SER; + } + + + public static class EntitySerializer extends AbstractSerializer<EntityWrapper> { + + + private static final ByteBufferSerializer BUFFER_SERIALIZER = ByteBufferSerializer.get(); + + private static final BytesArraySerializer BYTES_ARRAY_SERIALIZER = BytesArraySerializer.get(); + + + public static final SmileFactory f = new SmileFactory(); + + public static ObjectMapper mapper; + + private static byte[] STATE_COMPLETE = new byte[] { 0 }; + private static byte[] STATE_DELETED = new byte[] { 1 }; + private static byte[] STATE_PARTIAL = new byte[] { 2 }; + + private static byte[] VERSION = new byte[] { 0 }; + + + public EntitySerializer() { + try { + mapper = new ObjectMapper( f ); + // mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output, + // causes slowness + mapper.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" ); + } + catch ( Exception e ) { + throw new RuntimeException( "Error setting up mapper", e ); + } + } + + + @Override + public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) { + if ( wrapper == null ) { + return null; + } + + CompositeBuilder builder = Composites.newCompositeBuilder(); + + builder.addBytes( VERSION ); + + //mark this version as empty + if ( !wrapper.entity.isPresent() ) { + //we're empty + builder.addBytes( STATE_DELETED ); + + return builder.build(); + } + + //we have an entity + + if ( wrapper.status == MvccEntity.Status.COMPLETE ) { + builder.addBytes( STATE_COMPLETE ); + } + + else { + builder.addBytes( STATE_PARTIAL ); + } + + try { + final byte[] entityBytes = mapper.writeValueAsBytes( wrapper.entity.get() ); + builder.addBytes( entityBytes ); + } + catch ( Exception e ) { + throw new RuntimeException( "Unable to serialize entity", e ); + } + + return builder.build(); + } + + + @Override + public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) { + + /** + * We intentionally turn data corruption exceptions when we're unable to de-serialize + * the data in cassandra. If this occurs, we'll never be able to de-serialize it + * and it should be considered lost. This is an error that is occuring due to a bug + * in serializing the entity. This is a lazy recognition + repair signal for deployment with + * existing systems. + */ + CompositeParser parser; + try { + parser = Composites.newCompositeParser( byteBuffer ); + } + catch ( Exception e ) { + throw new DataCorruptionException( "Unable to de-serialze entity", e ); + } + + byte[] version = parser.read( BYTES_ARRAY_SERIALIZER ); + + if ( !Arrays.equals( VERSION, version ) ) { + throw new UnsupportedOperationException( "A version of type " + version + " is unsupported" ); + } + + byte[] state = parser.read( BYTES_ARRAY_SERIALIZER ); + + // it's been deleted, remove it + + if ( Arrays.equals( STATE_DELETED, state ) ) { + return new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() ); + } + + Entity storedEntity; + + ByteBuffer jsonBytes = parser.read( BUFFER_SERIALIZER ); + byte[] array = jsonBytes.array(); + int start = jsonBytes.arrayOffset(); + int length = jsonBytes.remaining(); + + try { + storedEntity = mapper.readValue( array, start, length, Entity.class ); + } + catch ( Exception e ) { + throw new DataCorruptionException( "Unable to read entity data", e ); + } + + final Optional<Entity> entity = Optional.of( storedEntity ); + + if ( Arrays.equals( STATE_COMPLETE, state ) ) { + return new EntityWrapper( MvccEntity.Status.COMPLETE, entity ); + } + + // it's partial by default + return new EntityWrapper( MvccEntity.Status.PARTIAL, entity ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java index 7368e9a..831091d 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java @@ -71,7 +71,7 @@ import com.netflix.astyanax.serializers.UUIDSerializer; * @author tnine */ @Singleton -public class MvccLogEntrySerializationStrategyImpl implements MvccLogEntrySerializationStrategy, Migration { +public class MvccLogEntrySerializationStrategyImpl implements MvccLogEntrySerializationStrategy { private static final Logger LOG = LoggerFactory.getLogger( MvccLogEntrySerializationStrategyImpl.class ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java index 8a4a9ac..eb9c374 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java @@ -21,9 +21,13 @@ package org.apache.usergrid.persistence.collection.serialization.impl; import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy; import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; +import org.apache.usergrid.persistence.core.guice.CurrentImpl; +import org.apache.usergrid.persistence.core.guice.PreviousImpl; +import org.apache.usergrid.persistence.core.guice.ProxyImpl; import org.apache.usergrid.persistence.core.migration.schema.Migration; import com.google.inject.AbstractModule; +import com.google.inject.Key; import com.google.inject.multibindings.Multibinder; @@ -37,14 +41,23 @@ public class SerializationModule extends AbstractModule { // bind the serialization strategies - bind( MvccEntitySerializationStrategy.class ).to( MvccEntitySerializationStrategyImpl.class ); + + //We've migrated this one, so we need to set up the previous, current, and proxy + bind( MvccEntitySerializationStrategy.class ).annotatedWith( PreviousImpl.class ) + .to( MvccEntitySerializationStrategyV1Impl.class ); + bind( MvccEntitySerializationStrategy.class ).annotatedWith( CurrentImpl.class ) + .to( MvccEntitySerializationStrategyV2Impl.class ); + bind( MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class ) + .to( MvccEntitySerializationStrategyProxyImpl.class ); + bind( MvccLogEntrySerializationStrategy.class ).to( MvccLogEntrySerializationStrategyImpl.class ); bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class ); //do multibindings for migrations Multibinder<Migration> uriBinder = Multibinder.newSetBinder( binder(), Migration.class ); - uriBinder.addBinding().to( MvccEntitySerializationStrategyImpl.class ); - uriBinder.addBinding().to( MvccLogEntrySerializationStrategyImpl.class ); - uriBinder.addBinding().to( UniqueValueSerializationStrategyImpl.class ); + uriBinder.addBinding().to( Key.get( MvccEntitySerializationStrategy.class, PreviousImpl.class ) ); + uriBinder.addBinding().to( Key.get( MvccEntitySerializationStrategy.class, CurrentImpl.class ) ); + uriBinder.addBinding().to( Key.get( MvccLogEntrySerializationStrategy.class ) ); + uriBinder.addBinding().to( Key.get( UniqueValueSerializationStrategy.class ) ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java index b6b3cab..be95b08 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java @@ -57,7 +57,7 @@ import com.netflix.astyanax.util.RangeBuilder; /** * Reads and writes to UniqueValues column family. */ -public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializationStrategy, Migration { +public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializationStrategy { private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java index f2deb3e..0d44e98 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java @@ -6,7 +6,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; -import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule; +import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; import org.apache.usergrid.persistence.core.test.ITRunner; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java index 66f162f..2d18675 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java @@ -23,18 +23,19 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; -import org.apache.usergrid.persistence.core.test.UseModules; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; -import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule; +import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; import org.apache.usergrid.persistence.collection.mvcc.entity.Stage; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.core.test.ITRunner; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; @@ -42,10 +43,10 @@ import org.apache.usergrid.persistence.model.field.BooleanField; import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.field.IntegerField; import org.apache.usergrid.persistence.model.field.StringField; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; import com.fasterxml.uuid.UUIDComparator; import com.google.inject.Inject; -import org.apache.usergrid.persistence.core.util.Health; import rx.Observable; @@ -58,8 +59,8 @@ import static org.junit.Assert.fail; /** @author tnine */ -@RunWith(ITRunner.class) -@UseModules(TestCollectionModule.class) +@RunWith( ITRunner.class ) +@UseModules( TestCollectionModule.class ) public class EntityCollectionManagerIT { @Inject private EntityCollectionManagerFactory factory; @@ -533,7 +534,7 @@ public class EntityCollectionManagerIT { } - @Test(expected = IllegalArgumentException.class) + @Test( expected = IllegalArgumentException.class ) public void readTooLarge() { final CollectionScope context = @@ -630,8 +631,7 @@ public class EntityCollectionManagerIT { final UUID v2Version = v2Created.getVersion(); - assertTrue( "Newer version in v2", - UUIDComparator.staticCompare( v2Version, v1Version) > 0 ); + assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 ); final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last(); @@ -678,12 +678,9 @@ public class EntityCollectionManagerIT { final UUID v2Version = v2Created.getVersion(); - - assertEquals( "Same entityId", v1Created.getId(), v2Created.getId() ); - assertTrue( "Newer version in v2", - UUIDComparator.staticCompare( v2Version, v1Version ) > 0 ); + assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 ); final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last(); @@ -700,11 +697,60 @@ public class EntityCollectionManagerIT { @Test public void healthTest() { - CollectionScope context = new CollectionScopeImpl( - new SimpleId( "organization" ), new SimpleId( "test" ), "test" ); + CollectionScope context = + new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" ); final EntityCollectionManager manager = factory.createCollectionManager( context ); assertEquals( Health.GREEN, manager.getHealth() ); } + + + /** + * Tests an entity with more than 65535 bytes worth of data + */ + @Test + public void largeEntityWriteRead() { + final int setSize = 65535 * 2; + + int currentLength = 0; + + final Entity entity = new Entity( new SimpleId( "test" ) ); + + //generate a really large string value + StringBuilder builder = new StringBuilder(); + + for ( int i = 0; i < 100; i++ ) { + builder.append( UUIDGenerator.newTimeUUID().toString() ); + } + + final String value = builder.toString(); + + + //loop until our size is beyond the set size + for ( int i = 0; currentLength < setSize; i++ ) { + final String key = "newStringField" + i; + + entity.setField( new StringField( key, value ) ); + + currentLength += key.length() + value.length(); + } + + + //now we have one massive, entity, save it and retrieve it. + CollectionScope context = + new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" ); + + final EntityCollectionManager manager = factory.createCollectionManager( context ); + + final Entity saved = manager.write( entity ).toBlocking().last(); + + + assertEquals( entity, saved ); + + //now load it + final Entity loaded = manager.load( entity.getId() ).toBlocking().last(); + + assertEquals( entity, loaded ); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java index d4f4d81..11ad389 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java @@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory; import org.apache.commons.lang3.time.StopWatch; -import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule; +import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; import org.apache.usergrid.persistence.core.test.ITRunner; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java index ec35ed2..d4881de 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java @@ -6,7 +6,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; -import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule; +import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; import org.apache.usergrid.persistence.core.test.ITRunner; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java deleted file mode 100644 index 6b02b63..0000000 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.usergrid.persistence.collection.guice; - - -import org.junit.rules.ExternalResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.core.migration.schema.MigrationException; -import org.apache.usergrid.persistence.core.migration.schema.MigrationManager; - -import com.google.inject.Inject; -import com.google.inject.Singleton; - - -/** - */ -@Singleton -public class MigrationManagerRule extends ExternalResource { - private static final Logger LOG = LoggerFactory.getLogger( MigrationManagerRule.class ); - - private MigrationManager migrationManager; - - - @Inject - public void setMigrationManager( MigrationManager migrationManager ) { - this.migrationManager = migrationManager; - } - - - @Override - protected void before() throws MigrationException { - LOG.info( "Starting migration" ); - - migrationManager.migrate(); - - LOG.info( "Migration complete" ); - } -}
