Added tests for when we have to make multiple trips to Cassandra.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/56ce7ce6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/56ce7ce6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/56ce7ce6 Branch: refs/heads/two-dot-o Commit: 56ce7ce6df21b08282aa484ead6c3a19397f0d14 Parents: 3e625b1 Author: Todd Nine <[email protected]> Authored: Fri Nov 21 13:32:52 2014 -0700 Committer: Todd Nine <[email protected]> Committed: Fri Nov 21 13:32:52 2014 -0700 ---------------------------------------------------------------------- .../MvccEntitySerializationStrategyImpl.java | 132 ++++++++++++++----- .../MvccEntitySerializationStrategyV1Impl.java | 6 +- .../MvccEntitySerializationStrategyV2Impl.java | 5 +- .../MvccEntitySerializationStrategyV2Test.java | 101 +++++++++++--- 4 files changed, 188 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java index f3f4c13..c9ec9a8 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java @@ -42,9 +42,9 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImp import org.apache.usergrid.persistence.collection.serialization.EntityRepair; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.util.EntityUtils; +import org.apache.usergrid.persistence.core.astyanax.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator; import org.apache.usergrid.persistence.core.astyanax.ColumnParser; -import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; @@ -61,9 +61,15 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import com.netflix.astyanax.model.Column; import com.netflix.astyanax.model.ColumnList; import com.netflix.astyanax.model.Row; +import com.netflix.astyanax.model.Rows; import com.netflix.astyanax.query.RowQuery; import com.netflix.astyanax.serializers.AbstractSerializer; -import com.netflix.astyanax.serializers.UUIDSerializer; + +import rx.Observable; +import rx.Scheduler; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.schedulers.Schedulers; /** @@ -76,14 +82,17 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS protected final Keyspace keyspace; protected final SerializationFig serializationFig; + protected final CassandraFig cassandraFig; protected final EntityRepair repair; private final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> columnFamily; @Inject - public MvccEntitySerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig ) { + public MvccEntitySerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig, + final CassandraFig cassandraFig ) { this.keyspace = keyspace; this.serializationFig = serializationFig; + this.cassandraFig = cassandraFig; this.repair = new EntityRepairImpl( this, serializationFig ); this.columnFamily = getColumnFamily(); } @@ -100,17 +109,8 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS return doWrite( collectionScope, entityId, new RowOp() { @Override public void doOp( final ColumnListMutation<UUID> colMutation ) { -// try { colMutation.putColumn( colName, getEntitySerializer() .toByteBuffer( new EntityWrapper( entity.getStatus(), entity.getEntity() ) ) ); -// } -// catch ( Exception e ) { -// // throw better exception if we can -// if ( entity != null || entity.getEntity().get() != null ) { -// throw new CollectionRuntimeException( entity, collectionScope, e ); -// } -// throw e; -// } } } ); } @@ -152,46 +152,110 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS rowKeys.add( rowKey ); } + /** + * Our settings may mean we exceed our maximum thrift buffer size. If we do, we have to make multiple requests, not just one. + * Perform the calculations and the appropriate request patterns + * + */ + + final int maxEntityResultSizeInBytes = serializationFig.getMaxEntitySize() * entityIds.size(); - final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns; + //if we're less than 1, set the number of requests to 1 + final int numberRequests = Math.max(1, maxEntityResultSizeInBytes / cassandraFig.getThriftBufferSize()); + final int entitiesPerRequest = entityIds.size() / numberRequests; - try { - latestEntityColumns = keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys ) - .withColumnRange( maxVersion, null, false, 1 ).execute().getResult() - .iterator(); + + final Scheduler scheduler; + + //if it's a single request, run it on the same thread + if(numberRequests == 1){ + scheduler = Schedulers.immediate(); } - catch ( ConnectionException e ) { - throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra", - e ); + //if it's more than 1 request, run them on the I/O scheduler + else{ + scheduler = Schedulers.io(); } - final EntitySetImpl entitySetResults = new EntitySetImpl( entityIds.size() ); + final EntitySetImpl entitySetResults = Observable.from( rowKeys ) + //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary) + .buffer(entitiesPerRequest ) + .parallel( new Func1<Observable<List<ScopedRowKey + <CollectionPrefixedKey<Id>>>>, Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>>() { + + + @Override + public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> call( + final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) { + + + //here, we execute our query then emit the items either in parallel, or on the current thread if we have more than 1 request + return listObservable.map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>, + Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>() { + + + @Override + public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> call( + final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys ) { + + try { + return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys ) + .withColumnRange( maxVersion, null, false, + 1 ).execute().getResult(); + } + catch ( ConnectionException e ) { + throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra", + e ); + } + } + } ); - while ( latestEntityColumns.hasNext() ) { - final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next(); - final ColumnList<UUID> columns = row.getColumns(); - if ( columns.size() == 0 ) { - continue; } + }, scheduler ) - final Id entityId = row.getKey().getKey().getSubKey(); + //reduce all the output into a single Entity set + .reduce( new EntitySetImpl( entityIds.size() ), + new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>, EntitySetImpl>() { + @Override + public EntitySetImpl call( final EntitySetImpl entitySet, + final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> rows ) { - final Column<UUID> column = columns.getColumnByIndex( 0 ); + final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns = rows.iterator(); - final MvccEntity parsedEntity = - new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column ); + while ( latestEntityColumns.hasNext() ) { + final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next(); - //we *might* need to repair, it's not clear so check before loading into result sets - final MvccEntity maybeRepaired = repair.maybeRepair( collectionScope, parsedEntity ); + final ColumnList<UUID> columns = row.getColumns(); - entitySetResults.addEntity( maybeRepaired ); - } + if ( columns.size() == 0 ) { + continue; + } + + final Id entityId = row.getKey().getKey().getSubKey(); + + final Column<UUID> column = columns.getColumnByIndex( 0 ); + + final MvccEntity parsedEntity = + new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column ); + + //we *might* need to repair, it's not clear so check before loading into result sets + final MvccEntity maybeRepaired = repair.maybeRepair( collectionScope, parsedEntity ); + + entitySet.addEntity( maybeRepaired ); + } + + + + return entitySet; + } + } ).toBlocking().last(); return entitySetResults; + + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java index b40243d..119fb6d 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java @@ -26,6 +26,8 @@ import java.util.UUID; import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.exception.DataCorruptionException; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; +import org.apache.usergrid.persistence.core.astyanax.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; @@ -69,8 +71,8 @@ public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializati @Inject - public MvccEntitySerializationStrategyV1Impl( final Keyspace keyspace, final SerializationFig serializationFig ) { - super( keyspace, serializationFig ); + public MvccEntitySerializationStrategyV1Impl( final Keyspace keyspace, final SerializationFig serializationFig, final CassandraFig cassandraFig ) { + super( keyspace, serializationFig, cassandraFig ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java index 923c399..cd46c1e 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java @@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.exception.DataCorruptionException; import org.apache.usergrid.persistence.collection.exception.EntityTooLargeException; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.core.astyanax.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.FieldBuffer; import org.apache.usergrid.persistence.core.astyanax.FieldBufferBuilder; import org.apache.usergrid.persistence.core.astyanax.FieldBufferParser; @@ -72,8 +73,8 @@ public class MvccEntitySerializationStrategyV2Impl extends MvccEntitySerializati @Inject - public MvccEntitySerializationStrategyV2Impl( final Keyspace keyspace, final SerializationFig serializationFig ) { - super( keyspace, serializationFig ); + public MvccEntitySerializationStrategyV2Impl( final Keyspace keyspace, final SerializationFig serializationFig, final CassandraFig cassandraFig ) { + super( keyspace, serializationFig, cassandraFig ); entitySerializer = new EntitySerializer( serializationFig ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/56ce7ce6/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java index a496519..5f633a1 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Test.java @@ -20,25 +20,25 @@ package org.apache.usergrid.persistence.collection.serialization.impl; -import java.lang.annotation.Annotation; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.UUID; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.safehaus.guicyfig.Bypass; -import org.safehaus.guicyfig.Env; -import org.safehaus.guicyfig.Option; import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.exception.EntityTooLargeException; import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.util.EntityHelper; +import org.apache.usergrid.persistence.core.astyanax.CassandraFig; import org.apache.usergrid.persistence.core.guicyfig.SetConfigTestBypass; import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.model.entity.Entity; @@ -49,6 +49,7 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator; import com.google.inject.Inject; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -58,21 +59,27 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe @Inject protected SerializationFig serializationFig; + @Inject + protected CassandraFig cassandraFig; + + private int setMaxEntitySize; @Before - public void setUp(){ + public void setUp() { - setMaxEntitySize = serializationFig.getMaxEntitySize(); + setMaxEntitySize = serializationFig.getMaxEntitySize(); } + @After - public void tearDown(){ + public void tearDown() { SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", setMaxEntitySize + "" ); } + /** * Tests an entity with more than 65535 bytes worth of data is successfully stored and retrieved */ @@ -83,7 +90,7 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe //this is the size it works out to be when serialized, we want to allow this size - SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", 65535*10+""); + SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", 65535 * 10 + "" ); final Entity entity = EntityHelper.generateEntity( setSize ); //now we have one massive, entity, save it and retrieve it. @@ -106,7 +113,12 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe getMvccEntitySerializationStrategy().loadDescendingHistory( context, id, version, 100 ); - assertLargeEntity( mvccEntity, loaded ); + assertTrue( loaded.hasNext() ); + + final MvccEntity loadedEntity = loaded.next(); + + assertLargeEntity( mvccEntity, loadedEntity ); + MvccEntity returned = serializationStrategy.load( context, Collections.singleton( id ), version ).getEntity( id ); @@ -115,7 +127,6 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe } - /** * Tests an entity with more than 65535 bytes worth of data is successfully stored and retrieved */ @@ -142,23 +153,77 @@ public abstract class MvccEntitySerializationStrategyV2Test extends MvccEntitySe } - protected void assertLargeEntity( final MvccEntity expected, final Iterator<MvccEntity> returned ) { - assertTrue( returned.hasNext() ); + /** + * Tests an entity with more than 65535 bytes worth of data is successfully stored and retrieved + */ + @Test + public void largeEntityReadWrite() throws ConnectionException { + + //this is the size it works out to be when serialized, we want to allow this size + + //extreme edge case, we can only get 2 entities per call + final int thriftBuffer = cassandraFig.getThriftBufferSize(); - final MvccEntity loadedEntity = returned.next(); - assertLargeEntity( expected, loadedEntity ); - } + //we use 20, using 2 causes cassandra to OOM. We don't have a large enough instance running locally - protected void assertLargeEntity( final MvccEntity expected, final MvccEntity returned ) { + final int maxEntitySize = ( int ) ( ( thriftBuffer * .9 ) / 20 ); - org.junit.Assert.assertEquals( "The loaded entity should match the stored entity", expected, returned ); - EntityHelper.verifyDeepEquals( expected.getEntity().get(), returned.getEntity().get() ); + SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", maxEntitySize + "" ); + + + final int size = 100; + + final HashMap<Id, MvccEntity> entities = new HashMap<>( size ); + + CollectionScope context = + new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "parent" ), "tests" ); + + + for ( int i = 0; i < size; i++ ) { + final Entity entity = EntityHelper.generateEntity( ( int ) (maxEntitySize*.4) ); + + //now we have one massive, entity, save it and retrieve it. + + final Id id = entity.getId(); + ValidationUtils.verifyIdentity( id ); + final UUID version = UUIDGenerator.newTimeUUID(); + final MvccEntity.Status status = MvccEntity.Status.COMPLETE; + + final MvccEntity mvccEntity = new MvccEntityImpl( id, version, status, entity ); + + + getMvccEntitySerializationStrategy().write( context, mvccEntity ).execute(); + + entities.put( id, mvccEntity ); + } + + + //now load it, we ask for 100 and we only are allowed 2 per trip due to our max size constraints. Should all + //still load (note that users should not be encouraged to use this strategy, it's a bad idea!) + final EntitySet loaded = + getMvccEntitySerializationStrategy().load( context, entities.keySet(), UUIDGenerator.newTimeUUID() ); + + assertNotNull( "Entity set was loaded", loaded ); + + + for ( Map.Entry<Id, MvccEntity> entry : entities.entrySet() ) { + + final MvccEntity returned = loaded.getEntity( entry.getKey() ); + + assertLargeEntity( entry.getValue(), returned ); + } } + protected void assertLargeEntity( final MvccEntity expected, final MvccEntity returned ) { + + org.junit.Assert.assertEquals( "The loaded entity should match the stored entity", expected, returned ); + + EntityHelper.verifyDeepEquals( expected.getEntity().get(), returned.getEntity().get() ); + } }
