Initial UniqueValueSerialization conversion to CQL.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0c609878 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0c609878 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0c609878 Branch: refs/heads/datastax-cass-driver Commit: 0c609878e1eccd35f31bc4fdc86bd3fe9da21593 Parents: ff3f7e8 Author: Michael Russo <[email protected]> Authored: Fri May 6 22:36:37 2016 +0800 Committer: Michael Russo <[email protected]> Committed: Fri May 6 22:36:37 2016 +0800 ---------------------------------------------------------------------- .../mvcc/stage/write/WriteCommit.java | 18 +- .../mvcc/stage/write/WriteUniqueVerify.java | 23 +- .../UniqueValueSerializationStrategy.java | 6 +- .../UniqueValueSerializationStrategyImpl.java | 287 ++++++++++++- ...iqueValueSerializationStrategyProxyImpl.java | 31 +- .../UniqueValueSerializationStrategyV1Impl.java | 410 ++++++++++++++++++- .../UniqueValueSerializationStrategyV2Impl.java | 379 ++++++++++++++++- .../migration/MvccEntityDataMigrationImpl.java | 26 +- .../mvcc/stage/delete/MarkCommitTest.java | 13 +- .../mvcc/stage/write/WriteCommitTest.java | 15 +- .../mvcc/stage/write/WriteUniqueVerifyTest.java | 6 +- ...niqueValueSerializationStrategyImplTest.java | 41 +- ...ctMvccEntityDataMigrationV1ToV3ImplTest.java | 5 +- .../core/datastax/impl/DataStaxClusterImpl.java | 3 + 14 files changed, 1169 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java index 7eb96e7..cfac8e4 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java @@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write; import java.util.UUID; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,11 +69,14 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect private final MvccEntitySerializationStrategy entityStrat; + private final Session session; + @Inject public WriteCommit( final MvccLogEntrySerializationStrategy logStrat, final MvccEntitySerializationStrategy entryStrat, - final UniqueValueSerializationStrategy uniqueValueStrat) { + final UniqueValueSerializationStrategy uniqueValueStrat, + final Session session) { Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" ); Preconditions.checkNotNull( entryStrat, "MvccEntitySerializationStrategy is required" ); @@ -80,6 +85,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect this.logEntryStrat = logStrat; this.entityStrat = entryStrat; this.uniqueValueStrat = uniqueValueStrat; + this.session = session; } @@ -103,6 +109,8 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE ); + + MutationBatch logMutation = logEntryStrat.write( applicationScope, startEntry ); // now get our actual insert into the entity data @@ -112,21 +120,23 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect logMutation.mergeShallow( entityMutation ); // re-write the unique values but this time with no TTL + final BatchStatement uniqueBatch = new BatchStatement(); + for ( Field field : EntityUtils.getUniqueFields(mvccEntity.getEntity().get()) ) { UniqueValue written = new UniqueValueImpl( field, entityId,version); - MutationBatch mb = uniqueValueStrat.write(applicationScope, written ); + uniqueBatch.add(uniqueValueStrat.writeCQL(applicationScope, written, -1 )); logger.debug("Finalizing {} unique value {}", field.getName(), field.getValue().toString()); - // merge into our existing mutation batch - logMutation.mergeShallow( mb ); + } try { logMutation.execute(); + session.execute(uniqueBatch); } catch ( ConnectionException e ) { logger.error( "Failed to execute write asynchronously ", e ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java index 585c26e..8e0b202 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,14 +73,20 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> protected final SerializationFig serializationFig; protected final Keyspace keyspace; + + protected final Session session; + private final CassandraConfig cassandraFig; @Inject public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy, - final SerializationFig serializationFig, final Keyspace keyspace, final CassandraConfig cassandraFig ) { + final SerializationFig serializationFig, final Keyspace keyspace, + final CassandraConfig cassandraFig, final Session session ) { + this.keyspace = keyspace; this.cassandraFig = cassandraFig; + this.session = session; Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" ); Preconditions.checkNotNull( serializationFig, "serializationFig is required" ); @@ -101,7 +109,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> final ApplicationScope scope = ioevent.getEntityCollection(); - final MutationBatch batch = keyspace.prepareMutationBatch(); + final BatchStatement batch = new BatchStatement(); //allocate our max size, worst case final List<Field> uniqueFields = new ArrayList<>( entity.getFields().size() ); @@ -119,9 +127,8 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() ); // use TTL in case something goes wrong before entity is finally committed - final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() ); + batch.add(uniqueValueStrat.writeCQL( scope, written, serializationFig.getTimeout() )); - batch.mergeShallow( mb ); uniqueFields.add(field); } @@ -131,12 +138,8 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>> } //perform the write - try { - batch.execute(); - } - catch ( ConnectionException ex ) { - throw new RuntimeException( "Unable to write to cassandra", ex ); - } + session.execute(batch); + // use simple thread pool to verify fields in parallel ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,cassandraFig,scope, entity.getId().getType(), uniqueFields,entity); http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java index 3645107..56e8b87 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java @@ -21,6 +21,8 @@ package org.apache.usergrid.persistence.collection.serialization; import java.util.Collection; import java.util.Iterator; +import com.datastax.driver.core.BatchStatement; +import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl; import org.apache.usergrid.persistence.core.migration.data.VersionedData; import org.apache.usergrid.persistence.core.migration.schema.Migration; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -46,7 +48,6 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa * * @return MutatationBatch that encapsulates operation, caller may or may not execute. */ - MutationBatch write( ApplicationScope applicationScope, UniqueValue uniqueValue ); /** * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds. @@ -56,7 +57,8 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa * @param timeToLive How long object should live in seconds. -1 implies store forever * @return MutatationBatch that encapsulates operation, caller may or may not execute. */ - MutationBatch write( ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive ); + + BatchStatement writeCQL(ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive ); /** * Load UniqueValue that matches field from collection or null if that value does not exist. http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java index 0f27167..27a8609 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java @@ -18,9 +18,19 @@ package org.apache.usergrid.persistence.collection.serialization.impl; +import java.nio.ByteBuffer; import java.util.*; +import com.datastax.driver.core.*; +import com.datastax.driver.core.querybuilder.Clause; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Using; +import com.netflix.astyanax.model.*; +import com.netflix.astyanax.util.RangeBuilder; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.datastax.TableDefinition; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.field.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,18 +50,13 @@ import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.field.Field; import com.google.common.base.Preconditions; import com.netflix.astyanax.ColumnListMutation; import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import com.netflix.astyanax.model.Column; -import com.netflix.astyanax.model.ConsistencyLevel; -import com.netflix.astyanax.model.Row; import com.netflix.astyanax.query.RowQuery; -import com.netflix.astyanax.util.RangeBuilder; /** @@ -62,6 +67,9 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class ); + public static final String UUID_TYPE_REVERSED = "UUIDType(reversed=true)"; + + private final MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion> CF_UNIQUE_VALUES; @@ -70,6 +78,15 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> private final MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> CF_ENTITY_UNIQUE_VALUE_LOG ; + private final String TABLE_UNIQUE_VALUES; + private final String TABLE_UNIQUE_VALUES_LOG; + + + private final Map COLUMNS_UNIQUE_VALUES; + private final Map COLUMNS_UNIQUE_VALUES_LOG; + + + public static final int COL_VALUE = 0x0; @@ -77,6 +94,9 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> protected final Keyspace keyspace; private final CassandraFig cassandraFig; + private final Session session; + private final CassandraConfig cassandraConfig; + /** * Construct serialization strategy for keyspace. @@ -86,13 +106,24 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> * @param serializationFig The serialization configuration */ public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig, - final SerializationFig serializationFig ) { + final SerializationFig serializationFig, + final Session session, final CassandraConfig cassandraConfig) { this.keyspace = keyspace; this.cassandraFig = cassandraFig; this.serializationFig = serializationFig; + this.session = session; + this.cassandraConfig = cassandraConfig; + CF_UNIQUE_VALUES = getUniqueValuesCF(); CF_ENTITY_UNIQUE_VALUE_LOG = getEntityUniqueLogCF(); + + TABLE_UNIQUE_VALUES = getUniqueValuesTable().getTableName(); + TABLE_UNIQUE_VALUES_LOG = getEntityUniqueLogTable().getTableName(); + + COLUMNS_UNIQUE_VALUES = getUniqueValuesTable().getColumns(); + COLUMNS_UNIQUE_VALUES_LOG = getEntityUniqueLogTable().getColumns(); + } @@ -129,7 +160,6 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> } - @Override public MutationBatch write( final ApplicationScope collectionScope, final UniqueValue value, final int timeToLive ) { @@ -163,6 +193,86 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> } ); } + @Override + public BatchStatement writeCQL( final ApplicationScope collectionScope, final UniqueValue value, + final int timeToLive ){ + + + Preconditions.checkNotNull( value, "value is required" ); + + BatchStatement batch = new BatchStatement(); + + Using ttl = null; + if(timeToLive > 0){ + + ttl = QueryBuilder.ttl(timeToLive); + + } + + final Id entityId = value.getEntityId(); + final UUID entityVersion = value.getEntityVersion(); + final Field<?> field = value.getField(); + + ValidationUtils.verifyIdentity( entityId ); + ValidationUtils.verifyVersion( entityVersion ); + + final EntityVersion ev = new EntityVersion( entityId, entityVersion ); + final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field ); + + ByteBuffer partitionKey = getPartitionKey(collectionScope.getApplication(), value.getEntityId().getType(), + field.getTypeName().toString(), field.getName(), field.getValue()); + + ByteBuffer logPartitionKey = getLogPartitionKey(collectionScope.getApplication(), value.getEntityId()); + + + if(ttl != null) { + + Statement uniqueValueStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES) + .value("key", partitionKey) + .value("column1", serializeUniqueValueColumn(ev)) + .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED)) + .using(ttl); + + batch.add(uniqueValueStatement); + + + }else{ + + Statement uniqueValueStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES) + .value("key", partitionKey) + .value("column1", serializeUniqueValueColumn(ev)) + .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED)); + + batch.add(uniqueValueStatement); + + } + + // we always want to retain the log entry, so never write with the TTL + Statement uniqueValueLogStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES_LOG) + .value("key", logPartitionKey) + .value("column1", serializeUniqueValueLogColumn(uniqueFieldEntry)) + .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED)); + + batch.add(uniqueValueLogStatement); + + + + return batch; + + /** + * @Override + public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) { + colMutation.putColumn( ev, COL_VALUE ); + } + + + @Override + public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) { + colMutation.putColumn( uniqueFieldEntry, COL_VALUE ); + } + */ + } + @Override public MutationBatch delete( final ApplicationScope scope, UniqueValue value ) { @@ -236,18 +346,26 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> @Override public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields ) throws ConnectionException { - return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getAstyanaxReadCL() ), type, fields ); + return load( colScope, com.netflix.astyanax.model.ConsistencyLevel.valueOf( cassandraFig.getAstyanaxReadCL() ), type, fields ); } @Override - public UniqueValueSet load( final ApplicationScope appScope, final ConsistencyLevel consistencyLevel, + public UniqueValueSet load( final ApplicationScope appScope, final com.netflix.astyanax.model.ConsistencyLevel consistencyLevel, final String type, final Collection<Field> fields ) throws ConnectionException { Preconditions.checkNotNull( fields, "fields are required" ); Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" ); + return loadCQL(appScope, com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM, type, fields); + //return loadLegacy( appScope, type, fields); + + } + + + private UniqueValueSet loadLegacy(final ApplicationScope appScope, + final String type, final Collection<Field> fields) throws ConnectionException { final List<ScopedRowKey<FieldKey>> keys = new ArrayList<>( fields.size() ); final Id applicationId = appScope.getApplication(); @@ -265,16 +383,16 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() ); - Iterator<Row<ScopedRowKey<FieldKey>, EntityVersion>> results = - keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel( consistencyLevel ).getKeySlice( keys ) - .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator(); + Iterator<com.netflix.astyanax.model.Row<ScopedRowKey<FieldKey>, EntityVersion>> results = + keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel(com.netflix.astyanax.model.ConsistencyLevel.CL_LOCAL_QUORUM ).getKeySlice( keys ) + .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator(); while ( results.hasNext() ) { - final Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next(); + final com.netflix.astyanax.model.Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next(); final Field field = parseRowKey( unique.getKey() ); @@ -296,9 +414,112 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> } return uniqueValueSet; + + } + + private UniqueValueSet loadCQL( final ApplicationScope appScope, final com.datastax.driver.core.ConsistencyLevel consistencyLevel, + final String type, final Collection<Field> fields ) throws ConnectionException { + + Preconditions.checkNotNull( fields, "fields are required" ); + Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" ); + + + final Id applicationId = appScope.getApplication(); + + // row key = app UUID + app type + entityType + field type + field name + field value + + List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() ); + for ( Field field : fields ) { + + //log.info(Bytes.toHexString(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()))); + + partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue())); + + } + + final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() ); + + final Clause inKey = QueryBuilder.in("key", partitionKeys ); + + final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES) + .where(inKey) + .setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM); + + final ResultSet resultSet = session.execute(statement); + + + Iterator<com.datastax.driver.core.Row> results = resultSet.iterator(); + + + while( results.hasNext() ){ + + final com.datastax.driver.core.Row unique = results.next(); + ByteBuffer partitionKey = unique.getBytes("key"); + ByteBuffer column = unique.getBytesUnsafe("column1"); + + List<Object> keyContents = deserializePartitionKey(partitionKey); + List<Object> columnContents = deserializeUniqueValueColumn(column); + + Field field = null; + FieldTypeName fieldType; + String name; + String value; + if(this instanceof UniqueValueSerializationStrategyV2Impl) { + + fieldType = FieldTypeName.valueOf((String) keyContents.get(3)); + name = (String) keyContents.get(4); + value = (String) keyContents.get(5); + + }else{ + + fieldType = FieldTypeName.valueOf((String) keyContents.get(5)); + name = (String) keyContents.get(6); + value = (String) keyContents.get(7); + + } + + switch ( fieldType ) { + case BOOLEAN: + field = new BooleanField( name, Boolean.parseBoolean( value ) ); + break; + case DOUBLE: + field = new DoubleField( name, Double.parseDouble( value ) ); + break; + case FLOAT: + field = new FloatField( name, Float.parseFloat( value ) ); + break; + case INTEGER: + field = new IntegerField( name, Integer.parseInt( value ) ); + break; + case LONG: + field = new LongField( name, Long.parseLong( value ) ); + break; + case STRING: + field = new StringField( name, value ); + break; + case UUID: + field = new UUIDField( name, UUID.fromString( value ) ); + break; + } + + final EntityVersion entityVersion = new EntityVersion( + new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0)); + + + final UniqueValueImpl uniqueValue = + new UniqueValueImpl( field, entityVersion.getEntityId(), entityVersion.getEntityVersion() ); + + uniqueValueSet.addValue(uniqueValue); + + } + + return uniqueValueSet; + } + + @Override public Iterator<UniqueValue> getAllUniqueFields( final ApplicationScope collectionScope, final Id entityId ) { Preconditions.checkNotNull( collectionScope, "collectionScope is required" ); @@ -378,7 +599,13 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> @Override public Collection<TableDefinition> getTables() { - return Collections.emptyList(); + final TableDefinition uniqueValues = getUniqueValuesTable(); + + final TableDefinition uniqueValuesLog = getEntityUniqueLogTable(); + + + return Arrays.asList( uniqueValues, uniqueValuesLog ); + } @@ -389,6 +616,12 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> /** + * Get the CQL table definition for the unique values log table + */ + protected abstract TableDefinition getUniqueValuesTable(); + + + /** * Generate a key that is compatible with the column family * * @param applicationId The applicationId @@ -405,10 +638,32 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> protected abstract Field parseRowKey(final ScopedRowKey<FieldKey> rowKey); + protected abstract List<Object> deserializePartitionKey(ByteBuffer bb); + + + protected abstract Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry); + + protected abstract ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ); + + protected abstract ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId); + + protected abstract ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion); + + protected abstract List<Object> deserializeUniqueValueColumn(ByteBuffer bb); + + + + + + /** + * Get the column family for the unique field CF + */ + protected abstract MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> getEntityUniqueLogCF(); + /** - * Get the column family for the unique field CF + * Get the CQL table definition for the unique values log table */ - protected abstract MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> getEntityUniqueLogCF(); + protected abstract TableDefinition getEntityUniqueLogTable(); /** * Generate a key that is compatible with the column family http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java index 87b1641..bbfaa2d 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import com.datastax.driver.core.BatchStatement; import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; @@ -67,40 +68,22 @@ public class UniqueValueSerializationStrategyProxyImpl implements UniqueValueSer @Override - public MutationBatch write( final ApplicationScope applicationScope, final UniqueValue uniqueValue ) { - final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip(); - - if ( migration.needsMigration() ) { - final MutationBatch aggregateBatch = keyspace.prepareMutationBatch(); - - aggregateBatch.mergeShallow( migration.from.write( applicationScope, uniqueValue ) ); - aggregateBatch.mergeShallow( migration.to.write( applicationScope, uniqueValue ) ); - - return aggregateBatch; - } + public BatchStatement writeCQL(final ApplicationScope applicationScope, final UniqueValue uniqueValue, + final int timeToLive ){ - return migration.to.write( applicationScope, uniqueValue ); - } - - - @Override - public MutationBatch write( final ApplicationScope applicationScope, final UniqueValue uniqueValue, - final int timeToLive ) { final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip(); if ( migration.needsMigration() ) { - final MutationBatch aggregateBatch = keyspace.prepareMutationBatch(); - - aggregateBatch.mergeShallow( migration.from.write( applicationScope, uniqueValue, timeToLive ) ); - aggregateBatch.mergeShallow( migration.to.write( applicationScope, uniqueValue, timeToLive ) ); + migration.from.writeCQL( applicationScope, uniqueValue, timeToLive ); + migration.to.writeCQL( applicationScope, uniqueValue, timeToLive ); - return aggregateBatch; } - return migration.to.write( applicationScope, uniqueValue, timeToLive ); + return migration.to.writeCQL( applicationScope, uniqueValue, timeToLive ); } + @Override public UniqueValueSet load( final ApplicationScope applicationScope, final String type, final Collection<Field> fields ) throws ConnectionException { http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java index 2235f63..75666fa 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java @@ -20,20 +20,24 @@ package org.apache.usergrid.persistence.collection.serialization.impl; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; +import java.nio.ByteBuffer; +import java.util.*; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.Session; import org.apache.cassandra.db.marshal.BytesType; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.ColumnTypes; import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; +import org.apache.usergrid.persistence.core.datastax.CQLUtils; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.Field; @@ -50,6 +54,40 @@ import com.netflix.astyanax.Keyspace; public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializationStrategyImpl<CollectionPrefixedKey<Field>, CollectionPrefixedKey<Id>> { + + private static final String UNIQUE_VALUES_TABLE = CQLUtils.quote("Unique_Values"); + private static final Collection<String> UNIQUE_VALUES_PARTITION_KEYS = Collections.singletonList("key"); + private static final Collection<String> UNIQUE_VALUES_COLUMN_KEYS = Collections.singletonList("column1"); + private static final Map<String, DataType.Name> UNIQUE_VALUES_COLUMNS = + new HashMap<String, DataType.Name>() {{ + put( "key", DataType.Name.BLOB ); + put( "column1", DataType.Name.BLOB ); + put( "value", DataType.Name.BLOB ); }}; + private static final Map<String, String> UNIQUE_VALUES_CLUSTERING_ORDER = + new HashMap<String, String>(){{ put( "column1", "ASC" ); }}; + + + private static final String UNIQUE_VALUES_LOG_TABLE = CQLUtils.quote("Entity_Unique_Values"); + private static final Collection<String> UNIQUE_VALUES_LOG_PARTITION_KEYS = Collections.singletonList("key"); + private static final Collection<String> UNIQUE_VALUES_LOG_COLUMN_KEYS = Collections.singletonList("column1"); + private static final Map<String, DataType.Name> UNIQUE_VALUES_LOG_COLUMNS = + new HashMap<String, DataType.Name>() {{ + put( "key", DataType.Name.BLOB ); + put( "column1", DataType.Name.BLOB ); + put( "value", DataType.Name.BLOB ); }}; + private static final Map<String, String> UNIQUE_VALUES_LOG_CLUSTERING_ORDER = + new HashMap<String, String>(){{ put( "column1", "ASC" ); }}; + + + private final static TableDefinition uniqueValues = + new TableDefinition( UNIQUE_VALUES_TABLE, UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS, + UNIQUE_VALUES_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER); + + private final static TableDefinition uniqueValuesLog = + new TableDefinition( UNIQUE_VALUES_LOG_TABLE, UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS, + UNIQUE_VALUES_LOG_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_LOG_CLUSTERING_ORDER); + + private static final CollectionScopedRowKeySerializer<Field> ROW_KEY_SER = new CollectionScopedRowKeySerializer<>( UniqueFieldRowKeySerializer.get() ); @@ -79,9 +117,11 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ * @param serializationFig The serialization configuration */ @Inject - public UniqueValueSerializationStrategyV1Impl( final Keyspace keyspace, final CassandraFig cassandraFig, - final SerializationFig serializationFig ) { - super( keyspace, cassandraFig, serializationFig ); + public UniqueValueSerializationStrategyV1Impl(final Keyspace keyspace, final CassandraFig cassandraFig, + final SerializationFig serializationFig, + final Session session, + final CassandraConfig cassandraConfig) { + super( keyspace, cassandraFig, serializationFig, session, cassandraConfig ); } @@ -113,6 +153,12 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ return CF_UNIQUE_VALUES; } + @Override + protected TableDefinition getUniqueValuesTable(){ + + return uniqueValues; + } + @Override protected MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry> @@ -122,6 +168,14 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ @Override + protected TableDefinition getEntityUniqueLogTable(){ + + return uniqueValuesLog; + + } + + + @Override protected CollectionPrefixedKey<Field> createUniqueValueKey( final Id applicationId, final String type, final Field field) { @@ -141,6 +195,242 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ return rowKey.getKey().getSubKey(); } + @Override + protected List<Object> deserializePartitionKey(ByteBuffer bb){ + + + /** + * List<Object> keys = new ArrayList<>(8); + keys.add(0, appUUID); + keys.add(1, applicationType); + keys.add(2, appUUID); + keys.add(3, applicationType); + keys.add(4, entityType); + keys.add(5, fieldType); + keys.add(6, fieldName); + keys.add(7, fieldValueString); + + */ + + int count = 0; + List<Object> stuff = new ArrayList<>(); + while(bb.hasRemaining()){ + ByteBuffer data = CQLUtils.getWithShortLength(bb); + if(count == 0 || count == 2){ + stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + count++; + } + + return stuff; + + } + + @Override + protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){ + + /** + * final UUID version = value.getVersion(); + final Field<?> field = value.getField(); + + final FieldTypeName fieldType = field.getTypeName(); + final String fieldValue = field.getValue().toString().toLowerCase(); + + + DynamicComposite composite = new DynamicComposite( ); + + //we want to sort ascending to descending by version + composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED); + composite.addComponent( field.getName(), STRING_SERIALIZER ); + composite.addComponent( fieldValue, STRING_SERIALIZER ); + composite.addComponent( fieldType.name() , STRING_SERIALIZER); + */ + + // values are serialized as strings, not sure why, and always lower cased + String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase(); + + + List<Object> keys = new ArrayList<>(4); + keys.add(fieldEntry.getVersion()); + keys.add(fieldEntry.getField().getName()); + keys.add(fieldValueString); + keys.add(fieldEntry.getField().getTypeName().name()); + + String comparator = UUID_TYPE_REVERSED; + + int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+ + fieldEntry.getField().getTypeName().name().length(); + + // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality + size += keys.size()*65; + + // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow + size += keys.size()*comparator.length(); + + ByteBuffer stuff = ByteBuffer.allocate(size); + + + for (Object key : keys) { + + if(key.equals(fieldEntry.getVersion())) { + int p = comparator.indexOf("(reversed=true)"); + boolean desc = false; + if (p >= 0) { + comparator = comparator.substring(0, p); + desc = true; + } + + byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data + if (desc) { + a = (byte) Character.toUpperCase((char) a); + } + + stuff.putShort((short) ('è' | a)); + }else{ + comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here + stuff.putShort((short)comparator.length()); + stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED)); + } + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + // put a short that indicates how big the buffer is for this item + stuff.putShort((short) kb.remaining()); + + // put the actual item + stuff.put(kb.slice()); + + // put an equality byte ( again not used by part of legacy thrift Astyanax schema) + stuff.put((byte) 0); + + + } + + stuff.flip(); + return stuff.duplicate(); + + } + + @Override + protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){ + + return serializeKey(applicationId.getUuid(), applicationId.getType(), + entityType, fieldType, fieldName, fieldValue); + + } + + @Override + protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){ + + return serializeLogKey(applicationId.getUuid(), applicationId.getType(), + uniqueValueId.getUuid(), uniqueValueId.getType()); + + } + + @Override + protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){ + + /** + * final Id entityId = ev.getEntityId(); + final UUID entityUuid = entityId.getUuid(); + final String entityType = entityId.getType(); + + CompositeBuilder builder = Composites.newDynamicCompositeBuilder(); + + builder.addUUID( entityVersion ); + builder.addUUID( entityUuid ); + builder.addString(entityType ); + */ + + String comparator = "UTF8Type"; + + List<Object> keys = new ArrayList<>(3); + keys.add(entityVersion.getEntityVersion()); + keys.add(entityVersion.getEntityId().getUuid()); + keys.add(entityVersion.getEntityId().getType()); + + // UUIDs are 16 bytes + int size = 16+16+entityVersion.getEntityId().getType().length(); + + // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality + size += keys.size()*5; + + // we always add comparator to the buffer as well + size += keys.size()*comparator.length(); + + ByteBuffer stuff = ByteBuffer.allocate(size); + + for (Object key : keys) { + + if(key instanceof UUID){ + comparator = "UUIDType"; + }else{ + comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text + } + + stuff.putShort((short)comparator.length()); + stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED)); + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + // put a short that indicates how big the buffer is for this item + stuff.putShort((short) kb.remaining()); + + // put the actual item + stuff.put(kb.slice()); + + // put an equality byte ( again not used by part of legacy thrift Astyanax schema) + stuff.put((byte) 0); + + + } + + stuff.flip(); + return stuff.duplicate(); + + } + + @Override + protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){ + + List<Object> stuff = new ArrayList<>(); + int count = 0; + while(bb.hasRemaining()){ + + // custom columns have a short at beginning for comparator (which we don't use here ) + ByteBuffer comparator = CQLUtils.getWithShortLength(bb); + + ByteBuffer data = CQLUtils.getWithShortLength(bb); + + + // first two composites are UUIDs, rest are strings + if(count == 0) { + stuff.add(new UUID(data.getLong(), data.getLong())); + }else if(count ==1){ + stuff.add(new UUID(data.getLong(), data.getLong())); + }else{ + stuff.add(DataType.text().deserialize(data.duplicate(), ProtocolVersion.NEWEST_SUPPORTED)); + } + + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + count++; + } + + return stuff; + + } + + @Override protected CollectionPrefixedKey<Id> createEntityUniqueLogKey( final Id applicationId, @@ -163,4 +453,112 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ public int getImplementationVersion() { return CollectionDataVersions.INITIAL.getVersion(); } + + + + private ByteBuffer serializeKey( UUID appUUID, + String applicationType, + String entityType, + String fieldType, + String fieldName, + Object fieldValue ){ + + final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType ); + +// final CollectionPrefixedKey<Field> uniquePrefixedKey = +// new CollectionPrefixedKey<>( collectionName, applicationId, field ); + +// //read back the id +// final Id orgId = ID_SER.fromComposite( parser ); +// final Id scopeId = ID_SER.fromComposite( parser ); +// final String scopeName = parser.readString(); +// final K value = keySerializer.fromComposite( parser ); + + + // values are serialized as strings, not sure why, and always lower cased + String fieldValueString = fieldValue.toString().toLowerCase(); + + List<Object> keys = new ArrayList<>(8); + keys.add(0, appUUID); + keys.add(1, applicationType); + keys.add(2, appUUID); + keys.add(3, applicationType); + keys.add(4, collectionName); + keys.add(5, fieldType); + keys.add(6, fieldName); + keys.add(7, fieldValueString); + + + // UUIDs are 16 bytes, allocate the buffer accordingly + int size = 16 + applicationType.length() + 16 + applicationType.length() + collectionName.length() + + fieldType.length() + fieldName.length()+fieldValueString.length(); + + + // we always need to add length for the 2 byte short and 1 byte equality + size += keys.size()*3; + + ByteBuffer stuff = ByteBuffer.allocate(size); + + for (Object key : keys) { + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + stuff.putShort((short) kb.remaining()); + stuff.put(kb.slice()); + stuff.put((byte) 0); + + + } + stuff.flip(); + return stuff.duplicate(); + + } + + private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){ + + + final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType ); +// +// +// final CollectionPrefixedKey<Id> collectionPrefixedEntityKey = +// new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId ); + + List<Object> keys = new ArrayList<>(4); + keys.add(appUUID); + keys.add(applicationType); + keys.add(appUUID); + keys.add(applicationType); + keys.add(collectionName); + keys.add(entityId); + keys.add(entityType); + + int size = 16+applicationType.length()+16+applicationType.length()+collectionName.length()+16+entityType.length(); + + // we always need to add length for the 2 byte short and 1 byte equality + size += keys.size()*3; + + ByteBuffer stuff = ByteBuffer.allocate(size); + + for (Object key : keys) { + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + stuff.putShort((short) kb.remaining()); + stuff.put(kb.slice()); + stuff.put((byte) 0); + + + } + stuff.flip(); + return stuff.duplicate(); + + } + + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java index 0f233cf..4177c37 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java @@ -20,13 +20,16 @@ package org.apache.usergrid.persistence.collection.serialization.impl; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; +import java.nio.ByteBuffer; +import java.util.*; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.Session; import org.apache.cassandra.db.marshal.BytesType; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; +import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.ColumnTypes; import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; @@ -34,6 +37,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer; +import org.apache.usergrid.persistence.core.datastax.CQLUtils; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.Field; @@ -49,6 +53,37 @@ import com.netflix.astyanax.Keyspace; @Singleton public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializationStrategyImpl<TypeField, Id> { + private static final String UNIQUE_VALUES_TABLE = CQLUtils.quote("Unique_Values_V2"); + private static final Collection<String> UNIQUE_VALUES_PARTITION_KEYS = Collections.singletonList("key"); + private static final Collection<String> UNIQUE_VALUES_COLUMN_KEYS = Collections.singletonList("column1"); + private static final Map<String, DataType.Name> UNIQUE_VALUES_COLUMNS = + new HashMap<String, DataType.Name>() {{ + put( "key", DataType.Name.BLOB ); + put( "column1", DataType.Name.BLOB ); + put( "value", DataType.Name.BLOB ); }}; + private static final Map<String, String> UNIQUE_VALUES_CLUSTERING_ORDER = + new HashMap<String, String>(){{ put( "column1", "ASC" ); }}; + + + private static final String UNIQUE_VALUES_LOG_TABLE = CQLUtils.quote("Entity_Unique_Values_V2"); + private static final Collection<String> UNIQUE_VALUES_LOG_PARTITION_KEYS = Collections.singletonList("key"); + private static final Collection<String> UNIQUE_VALUES_LOG_COLUMN_KEYS = Collections.singletonList("column1"); + private static final Map<String, DataType.Name> UNIQUE_VALUES_LOG_COLUMNS = + new HashMap<String, DataType.Name>() {{ + put( "key", DataType.Name.BLOB ); + put( "column1", DataType.Name.BLOB ); + put( "value", DataType.Name.BLOB ); }}; + private static final Map<String, String> UNIQUE_VALUES_LOG_CLUSTERING_ORDER = + new HashMap<String, String>(){{ put( "column1", "ASC" ); }}; + + private final static TableDefinition uniqueValues = + new TableDefinition( UNIQUE_VALUES_TABLE, UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS, + UNIQUE_VALUES_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER); + + private final static TableDefinition uniqueValuesLog = + new TableDefinition( UNIQUE_VALUES_LOG_TABLE, UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS, + UNIQUE_VALUES_LOG_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_LOG_CLUSTERING_ORDER); + private static final ScopedRowKeySerializer<TypeField> ROW_KEY_SER = new ScopedRowKeySerializer<>( UniqueTypeFieldRowKeySerializer.get() ); @@ -80,8 +115,10 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ */ @Inject public UniqueValueSerializationStrategyV2Impl( final Keyspace keyspace, final CassandraFig cassandraFig, - final SerializationFig serializationFig ) { - super( keyspace, cassandraFig, serializationFig ); + final SerializationFig serializationFig, + final Session session, + final CassandraConfig cassandraConfig) { + super( keyspace, cassandraFig, serializationFig, session, cassandraConfig ); } @@ -104,7 +141,9 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ @Override public Collection<TableDefinition> getTables() { - return Collections.emptyList(); + + return Arrays.asList( uniqueValues, uniqueValuesLog ); + } @Override @@ -114,6 +153,12 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ @Override + protected TableDefinition getUniqueValuesTable(){ + return uniqueValues; + } + + + @Override protected MultiTenantColumnFamily<ScopedRowKey<Id>, UniqueFieldEntry> getEntityUniqueLogCF() { return CF_ENTITY_UNIQUE_VALUE_LOG; @@ -121,6 +166,13 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ @Override + protected TableDefinition getEntityUniqueLogTable(){ + return uniqueValuesLog; + } + + + + @Override protected TypeField createUniqueValueKey( final Id applicationId, final String type, final Field field) { return new TypeField(type,field); } @@ -131,6 +183,238 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ return rowKey.getKey().getField(); } + @Override + protected List<Object> deserializePartitionKey(ByteBuffer bb){ + + + /** + * List<Object> keys = new ArrayList<>(6); + keys.add(0, appUUID); // UUID + keys.add(1, applicationType); // String + keys.add(2, entityType); // String + keys.add(3, fieldType); // String + keys.add(4, fieldName); // String + keys.add(5, fieldValueString); // String + + */ + + List<Object> stuff = new ArrayList<>(); + while(bb.hasRemaining()){ + ByteBuffer data = CQLUtils.getWithShortLength(bb); + if(stuff.size() == 0){ + stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + } + + return stuff; + + } + + @Override + protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){ + + /** + * final UUID version = value.getVersion(); + final Field<?> field = value.getField(); + + final FieldTypeName fieldType = field.getTypeName(); + final String fieldValue = field.getValue().toString().toLowerCase(); + + + DynamicComposite composite = new DynamicComposite( ); + + //we want to sort ascending to descending by version + composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED); + composite.addComponent( field.getName(), STRING_SERIALIZER ); + composite.addComponent( fieldValue, STRING_SERIALIZER ); + composite.addComponent( fieldType.name() , STRING_SERIALIZER); + */ + + // values are serialized as strings, not sure why, and always lower cased + String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase(); + + + List<Object> keys = new ArrayList<>(4); + keys.add(fieldEntry.getVersion()); + keys.add(fieldEntry.getField().getName()); + keys.add(fieldValueString); + keys.add(fieldEntry.getField().getTypeName().name()); + + String comparator = UUID_TYPE_REVERSED; + + int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+ + fieldEntry.getField().getTypeName().name().length(); + + // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality + size += keys.size()*65; + + // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow + size += keys.size()*comparator.length(); + + ByteBuffer stuff = ByteBuffer.allocate(size); + + + for (Object key : keys) { + + if(key.equals(fieldEntry.getVersion())) { + int p = comparator.indexOf("(reversed=true)"); + boolean desc = false; + if (p >= 0) { + comparator = comparator.substring(0, p); + desc = true; + } + + byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data + if (desc) { + a = (byte) Character.toUpperCase((char) a); + } + + stuff.putShort((short) ('è' | a)); + }else{ + comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here + stuff.putShort((short)comparator.length()); + stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED)); + } + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + // put a short that indicates how big the buffer is for this item + stuff.putShort((short) kb.remaining()); + + // put the actual item + stuff.put(kb.slice()); + + // put an equality byte ( again not used by part of legacy thrift Astyanax schema) + stuff.put((byte) 0); + + + } + + stuff.flip(); + return stuff.duplicate(); + + } + + @Override + protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){ + + return serializeKey(applicationId.getUuid(), applicationId.getType(), + entityType, fieldType, fieldName, fieldValue); + + } + + @Override + protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){ + + return serializeLogKey(applicationId.getUuid(), applicationId.getType(), + uniqueValueId.getUuid(), uniqueValueId.getType()); + + } + + @Override + protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){ + + /** + * final Id entityId = ev.getEntityId(); + final UUID entityUuid = entityId.getUuid(); + final String entityType = entityId.getType(); + + CompositeBuilder builder = Composites.newDynamicCompositeBuilder(); + + builder.addUUID( entityVersion ); + builder.addUUID( entityUuid ); + builder.addString(entityType ); + */ + + String comparator = "UTF8Type"; + + List<Object> keys = new ArrayList<>(3); + keys.add(entityVersion.getEntityVersion()); + keys.add(entityVersion.getEntityId().getUuid()); + keys.add(entityVersion.getEntityId().getType()); + + // UUIDs are 16 bytes + int size = 16+16+entityVersion.getEntityId().getType().length(); + + // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality + size += keys.size()*5; + + // we always add comparator to the buffer as well + size += keys.size()*comparator.length(); + + ByteBuffer stuff = ByteBuffer.allocate(size); + + for (Object key : keys) { + + if(key instanceof UUID){ + comparator = "UUIDType"; + }else{ + comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text + } + + stuff.putShort((short)comparator.length()); + stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED)); + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + // put a short that indicates how big the buffer is for this item + stuff.putShort((short) kb.remaining()); + + // put the actual item + stuff.put(kb.slice()); + + // put an equality byte ( again not used by part of legacy thrift Astyanax schema) + stuff.put((byte) 0); + + + } + + stuff.flip(); + return stuff.duplicate(); + + } + + @Override + protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){ + + List<Object> stuff = new ArrayList<>(); + int count = 0; + while(bb.hasRemaining()){ + + // custom columns have a short at beginning for comparator (which we don't use here ) + ByteBuffer comparator = CQLUtils.getWithShortLength(bb); + + ByteBuffer data = CQLUtils.getWithShortLength(bb); + + + // first two composites are UUIDs, rest are strings + if(count == 0) { + stuff.add(new UUID(data.getLong(), data.getLong())); + }else if(count ==1){ + stuff.add(new UUID(data.getLong(), data.getLong())); + }else{ + stuff.add(DataType.text().deserialize(data.duplicate(), ProtocolVersion.NEWEST_SUPPORTED)); + } + + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + count++; + } + + return stuff; + + } + @Override protected Id createEntityUniqueLogKey( final Id applicationId, final Id uniqueValueId ) { @@ -142,4 +426,87 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ public int getImplementationVersion() { return CollectionDataVersions.LOG_REMOVAL.getVersion(); } + + + + // row key = app UUID + app type + app UUID + app type + field type + field name + field value + private ByteBuffer serializeKey(UUID appUUID, + String applicationType, + String entityType, + String fieldType, + String fieldName, + Object fieldValue ){ + + // values are serialized as strings, not sure why, and always lower cased + String fieldValueString = fieldValue.toString().toLowerCase(); + + List<Object> keys = new ArrayList<>(6); + keys.add(0, appUUID); + keys.add(1, applicationType); + keys.add(2, entityType); + keys.add(3, fieldType); + keys.add(4, fieldName); + keys.add(5, fieldValueString); + + + // UUIDs are 16 bytes, allocate the buffer accordingly + int size = 16 + applicationType.length() + entityType.length() + fieldType.length() + fieldName.length()+fieldValueString.length(); + + + // we always need to add length for the 2 byte short and 1 byte equality + size += keys.size()*3; + + ByteBuffer stuff = ByteBuffer.allocate(size); + + for (Object key : keys) { + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + stuff.putShort((short) kb.remaining()); + stuff.put(kb.slice()); + stuff.put((byte) 0); + + + } + stuff.flip(); + return stuff.duplicate(); + + } + + private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){ + + List<Object> keys = new ArrayList<>(4); + keys.add(appUUID); + keys.add(applicationType); + keys.add(entityId); + keys.add(entityType); + + int size = 16+applicationType.length()+16+entityType.length(); + + // we always need to add length for the 2 byte short and 1 byte equality + size += keys.size()*3; + + ByteBuffer stuff = ByteBuffer.allocate(size); + + for (Object key : keys) { + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + stuff.putShort((short) kb.remaining()); + stuff.put(kb.slice()); + stuff.put((byte) 0); + + + } + stuff.flip(); + return stuff.duplicate(); + + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java index a110ed7..8d52d8b 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import com.datastax.driver.core.Session; +import org.apache.cassandra.cql.BatchStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +73,7 @@ public class MvccEntityDataMigrationImpl implements DataMigration{ private static final Logger logger = LoggerFactory.getLogger( MvccEntityDataMigrationImpl.class ); private final Keyspace keyspace; + private final Session session; private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions; private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3; private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; @@ -80,12 +83,14 @@ public class MvccEntityDataMigrationImpl implements DataMigration{ @Inject public MvccEntityDataMigrationImpl( final Keyspace keyspace, + final Session session, final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions, final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, final MigrationDataProvider<EntityIdScope> migrationDataProvider ) { this.keyspace = keyspace; + this.session = session; this.allVersions = allVersions; this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3; this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; @@ -163,8 +168,9 @@ public class MvccEntityDataMigrationImpl implements DataMigration{ final List<Id> toSaveIds = new ArrayList<>( entities.size() ); + final com.datastax.driver.core.BatchStatement uniqueBatch = new com.datastax.driver.core.BatchStatement(); - for ( EntityToSaveMessage message : entities ) { + for ( EntityToSaveMessage message : entities ) { try { final MutationBatch entityRewrite = migration.to.write(message.scope, message.entity); @@ -197,17 +203,14 @@ public class MvccEntityDataMigrationImpl implements DataMigration{ // time with // no TTL so that cleanup can clean up // older values + + for (final Field field : EntityUtils.getUniqueFields(message.entity.getEntity().get())) { final UniqueValue written = new UniqueValueImpl(field, entityId, version); - final MutationBatch mb = uniqueValueSerializationStrategy.write(message.scope, written); - + uniqueBatch.add(uniqueValueSerializationStrategy.writeCQL(message.scope, written, -1)); - // merge into our - // existing mutation - // batch - totalBatch.mergeShallow(mb); } @@ -232,7 +235,7 @@ public class MvccEntityDataMigrationImpl implements DataMigration{ } - executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong ); + executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong, uniqueBatch ); //now run our cleanup task @@ -252,10 +255,13 @@ public class MvccEntityDataMigrationImpl implements DataMigration{ } - protected void executeBatch( final int targetVersion, final MutationBatch batch, final ProgressObserver po, - final AtomicLong count ) { + protected void executeBatch(final int targetVersion, final MutationBatch batch, final ProgressObserver po, + final AtomicLong count, com.datastax.driver.core.BatchStatement uniqueBatch) { try { + batch.execute(); + session.execute(uniqueBatch); + po.update( targetVersion, "Finished copying " + count + " entities to the new format" ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java index ad6eac6..b18b095 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java @@ -1,7 +1,13 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.delete; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; +import org.apache.usergrid.persistence.core.test.ITRunner; +import org.apache.usergrid.persistence.core.test.UseModules; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.apache.usergrid.persistence.collection.MvccEntity; @@ -32,6 +38,9 @@ import static org.mockito.Mockito.when; /** @author tnine */ public class MarkCommitTest extends AbstractMvccEntityStageTest { + @Inject + + /** Standard flow */ @Test public void testStartStage() throws Exception { @@ -39,6 +48,8 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest { final ApplicationScope context = mock( ApplicationScope.class ); + final Session session = mock(Session.class); + //mock returning a mock mutation when we do a log entry write final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class ); @@ -71,7 +82,7 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest { //run the stage - WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy ); + WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session ); //verify the observable is correct http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java index 58642d3..60281d4 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java @@ -18,7 +18,13 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; +import org.apache.usergrid.persistence.core.test.ITRunner; +import org.apache.usergrid.persistence.core.test.UseModules; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.apache.usergrid.persistence.collection.MvccEntity; @@ -53,6 +59,8 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest { final ApplicationScope context = mock( ApplicationScope.class ); + final Session session = mock(Session.class); + //mock returning a mock mutation when we do a log entry write final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class ); @@ -84,7 +92,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest { //run the stage - WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy ); + WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session ); Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get(); @@ -116,6 +124,9 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest { /** * Write up mock mutations so we don't npe on the our operations, but rather on the input */ + + final Session session = mock(Session.class); + final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class ); final MutationBatch logMutation = mock( MutationBatch.class ); @@ -131,7 +142,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest { when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) ) .thenReturn( entityMutation ); - new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy ).call( event ); + new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session ).call( event ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java index 09876fb..3ddc14d 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java @@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write; +import com.datastax.driver.core.Session; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -59,7 +60,8 @@ public class WriteUniqueVerifyTest { @Rule public MigrationManagerRule migrationManagerRule; - + @Inject + private Session session; @Inject private SerializationFig fig; @@ -82,7 +84,7 @@ public class WriteUniqueVerifyTest { final MvccEntity mvccEntity = fromEntity( entity ); // run the stage - WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig ); + WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig, session ); newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ; http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java index fcf22cf..3ffdb65 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.UUID; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Session; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -64,6 +66,8 @@ public abstract class UniqueValueSerializationStrategyImplTest { @Rule public MigrationManagerRule migrationManagerRule; + @Inject + private Session session; private UniqueValueSerializationStrategy strategy; @@ -91,7 +95,9 @@ public abstract class UniqueValueSerializationStrategyImplTest { Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); UUID version = UUIDGenerator.newTimeUUID(); UniqueValue stored = new UniqueValueImpl( field, entityId, version ); - strategy.write( scope, stored ).execute(); + //strategy.write( scope, stored ).execute(); + BatchStatement batch = strategy.writeCQL(scope, stored, -1); + session.execute(batch); UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.<Field>singleton( field ) ); @@ -127,7 +133,9 @@ public abstract class UniqueValueSerializationStrategyImplTest { Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); UUID version = UUIDGenerator.newTimeUUID(); UniqueValue stored = new UniqueValueImpl( field, entityId, version ); - strategy.write( scope, stored, 5 ).execute(); + //strategy.write( scope, stored, 5 ).execute(); + BatchStatement batch = strategy.writeCQL(scope, stored, 5); + session.execute(batch); Thread.sleep( 1000 ); @@ -179,7 +187,10 @@ public abstract class UniqueValueSerializationStrategyImplTest { Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); UUID version = UUIDGenerator.newTimeUUID(); UniqueValue stored = new UniqueValueImpl( field, entityId, version ); - strategy.write( scope, stored ).execute(); + + //strategy.write( scope, stored ).execute(); + BatchStatement batch = strategy.writeCQL( scope, stored, -1); + session.execute(batch); strategy.delete( scope, stored ).execute(); @@ -207,8 +218,9 @@ public abstract class UniqueValueSerializationStrategyImplTest { Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" ); UUID version = UUIDGenerator.newTimeUUID(); UniqueValue stored = new UniqueValueImpl( field, entityId, version ); - strategy.write( scope, stored ).execute(); - + //strategy.write( scope, stored ).execute(); + BatchStatement batch = strategy.writeCQL( scope, stored, -1); + session.execute(batch); UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.<Field>singleton( field ) ); @@ -278,9 +290,13 @@ public abstract class UniqueValueSerializationStrategyImplTest { UniqueValue version1Field1Value = new UniqueValueImpl( version1Field1, entityId, version1 ); UniqueValue version1Field2Value = new UniqueValueImpl( version1Field2, entityId, version1 ); - final MutationBatch batch = strategy.write( scope, version1Field1Value ); - batch.mergeShallow( strategy.write( scope, version1Field2Value ) ); + //final MutationBatch batch = strategy.write( scope, version1Field1Value ); + //batch.mergeShallow( strategy.write( scope, version1Field2Value ) ); + final BatchStatement batch = new BatchStatement(); + + batch.add(strategy.writeCQL( scope, version1Field1Value, -1)); + batch.add(strategy.writeCQL( scope, version1Field2Value, -1)); //write V2 of everything final UUID version2 = UUIDGenerator.newTimeUUID(); @@ -292,10 +308,15 @@ public abstract class UniqueValueSerializationStrategyImplTest { UniqueValue version2Field1Value = new UniqueValueImpl( version2Field1, entityId, version2 ); UniqueValue version2Field2Value = new UniqueValueImpl( version2Field2, entityId, version2 ); - batch.mergeShallow( strategy.write( scope, version2Field1Value ) ); - batch.mergeShallow( strategy.write( scope, version2Field2Value ) ); + //batch.mergeShallow( strategy.write( scope, version2Field1Value ) ); + //batch.mergeShallow( strategy.write( scope, version2Field2Value ) ); + + batch.add(strategy.writeCQL( scope, version2Field1Value, -1)); + batch.add(strategy.writeCQL( scope, version2Field2Value, -1)); + + session.execute(batch); - batch.execute(); + //batch.execute(); UniqueValueSet fields = strategy.load( scope, entityId.getType(), Arrays.<Field>asList( version1Field1, version1Field2 ) );
