http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java index 5320152,8c1f2d2..0753281 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java @@@ -18,34 -18,40 +18,38 @@@ package org.apache.usergrid.persistence.collection.serialization.impl; ++ +import java.nio.ByteBuffer; import java.util.*; -import com.netflix.astyanax.util.RangeBuilder; +import com.datastax.driver.core.*; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.querybuilder.Clause; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Using; +import org.apache.usergrid.persistence.core.CassandraConfig; ++import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; +import org.apache.usergrid.persistence.core.datastax.TableDefinition; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.field.*; ++ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.marshal.BytesType; -- import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.serialization.UniqueValue; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; -import org.apache.usergrid.persistence.core.astyanax.CassandraFig; -import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator; -import org.apache.usergrid.persistence.core.astyanax.ColumnParser; -import org.apache.usergrid.persistence.core.astyanax.ColumnTypes; -import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily; -import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; -import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; +import org.apache.usergrid.persistence.core.CassandraFig; - import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.field.Field; import com.google.common.base.Preconditions; + + import com.netflix.astyanax.ColumnListMutation; -import com.netflix.astyanax.Keyspace; -import com.netflix.astyanax.MutationBatch; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import com.netflix.astyanax.model.Column; -import com.netflix.astyanax.model.ConsistencyLevel; -import com.netflix.astyanax.model.Row; -import com.netflix.astyanax.query.RowQuery; ++ + /** * Reads and writes to UniqueValues column family. @@@ -53,27 -59,25 +57,29 @@@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey> implements UniqueValueSerializationStrategy { - private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class ); + private static final Logger logger = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class ); + public static final String UUID_TYPE_REVERSED = "UUIDType(reversed=true)"; - private final MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion> - CF_UNIQUE_VALUES; + private final String TABLE_UNIQUE_VALUES; + private final String TABLE_UNIQUE_VALUES_LOG; + + private final Map COLUMNS_UNIQUE_VALUES; + private final Map COLUMNS_UNIQUE_VALUES_LOG; - private final MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> - CF_ENTITY_UNIQUE_VALUE_LOG ; public static final int COL_VALUE = 0x0; + private final Comparator<UniqueValue> uniqueValueComparator = new UniqueValueComparator(); + private final SerializationFig serializationFig; - protected final Keyspace keyspace; private final CassandraFig cassandraFig; + private final Session session; + private final CassandraConfig cassandraConfig; + /** * Construct serialization strategy for keyspace. @@@ -186,134 -182,251 +192,285 @@@ final EntityVersion ev = new EntityVersion( entityId, entityVersion ); final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field ); - return doWrite( scope, value, new RowOp() { - @Override - public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) { - colMutation.deleteColumn( ev ); - } + ByteBuffer partitionKey = getPartitionKey( scope.getApplication(), value.getEntityId().getType(), + value.getField().getTypeName().toString(), value.getField().getName(), value.getField().getValue()); + ByteBuffer columnValue = serializeUniqueValueColumn(ev); - @Override - public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) { - colMutation.deleteColumn( uniqueFieldEntry ); - } - } ); - } - - - /** - * Do the column update or delete for the given column and row key - * - * @param applicationScope We need to use this when getting the keyspace - * @param uniqueValue The unique value to write - * @param op The operation to write - */ - private MutationBatch doWrite( ApplicationScope applicationScope, UniqueValue uniqueValue, RowOp op ) { - final MutationBatch batch = keyspace.prepareMutationBatch(); + final Clause uniqueEqKey = QueryBuilder.eq("key", partitionKey ); + final Clause uniqueEqColumn = QueryBuilder.eq("column1", columnValue ); + Statement uniqueDelete = QueryBuilder.delete().from(TABLE_UNIQUE_VALUES).where(uniqueEqKey).and(uniqueEqColumn); + batch.add(uniqueDelete); - final Id applicationId = applicationScope.getApplication(); - final FieldKey fieldKey = createUniqueValueKey( applicationId, uniqueValue.getEntityId().getType(), uniqueValue.getField() ); + ByteBuffer logPartitionKey = getLogPartitionKey(scope.getApplication(), entityId); + ByteBuffer logColumnValue = serializeUniqueValueLogColumn(uniqueFieldEntry); - op.doLookup( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( applicationId, fieldKey ) ) ); + final Clause uniqueLogEqKey = QueryBuilder.eq("key", logPartitionKey ); + final Clause uniqueLogEqColumn = QueryBuilder.eq("column1", logColumnValue ); - final EntityKey entityKey = createEntityUniqueLogKey( applicationId, uniqueValue.getEntityId() ); + Statement uniqueLogDelete = QueryBuilder.delete() + .from(TABLE_UNIQUE_VALUES_LOG).where(uniqueLogEqKey).and( uniqueLogEqColumn); - op.doLog( batch.withRow( CF_ENTITY_UNIQUE_VALUE_LOG, - ScopedRowKey.fromKey( applicationId, entityKey ) ) ); + batch.add(uniqueLogDelete); + if ( logger.isTraceEnabled() ) { + logger.trace( "Building batch statement for unique value entity={} version={} name={} value={} ", - uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion(), - uniqueValue.getField().getName(), uniqueValue.getField().getValue() ); ++ value.getEntityId().getUuid(), value.getEntityVersion(), ++ value.getField().getName(), value.getField().getValue() ); + } + + + return batch; } @Override -- public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields ) - { - return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields ); - throws ConnectionException { - return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCL() ), type, fields, false); ++ public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields ) { ++ ++ return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields, false ); ++ } + @Override + public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields, - boolean useReadRepair) - throws ConnectionException { - return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCL() ), type, fields, useReadRepair); ++ boolean useReadRepair) { ++ ++ return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields, useReadRepair); ++ + } + + @Override - public UniqueValueSet load(final ApplicationScope appScope, final ConsistencyLevel consistencyLevel, - final String type, final Collection<Field> fields, boolean useReadRepair) throws ConnectionException { + public UniqueValueSet load( final ApplicationScope appScope, + final ConsistencyLevel consistencyLevel, - final String type, final Collection<Field> fields ) { ++ final String type, final Collection<Field> fields, boolean useReadRepair ) { ++ Preconditions.checkNotNull( fields, "fields are required" ); Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" ); - return loadCQL(appScope, consistencyLevel, type, fields); ++ return loadCQL(appScope, consistencyLevel, type, fields, useReadRepair); + + } + + + private UniqueValueSet loadCQL( final ApplicationScope appScope, + final ConsistencyLevel consistencyLevel, - final String type, final Collection<Field> fields ) { ++ final String type, final Collection<Field> fields, boolean useReadRepair ) { + + Preconditions.checkNotNull( fields, "fields are required" ); + Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" ); - final List<ScopedRowKey<FieldKey>> keys = new ArrayList<>( fields.size() ); final Id applicationId = appScope.getApplication(); - for ( Field field : fields ) { + // row key = app UUID + app type + entityType + field type + field name + field value - List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() ); - final FieldKey key = createUniqueValueKey( applicationId, type, field ); + + - final ScopedRowKey<FieldKey> rowKey = - ScopedRowKey.fromKey( applicationId, key ); + - keys.add( rowKey ); - } ++ //List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() ); + + final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() ); + - Iterator<Row<ScopedRowKey<FieldKey>, EntityVersion>> results = - keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel( consistencyLevel ).getKeySlice( keys ) - .withColumnRange(new RangeBuilder().setLimit(serializationFig.getMaxLoadSize()).build()) - .execute().getResult().iterator(); + - if( !results.hasNext()){ - if(logger.isTraceEnabled()){ - logger.trace("No partitions returned for unique value lookup"); - } - } + for ( Field field : fields ) { + + //log.info(Bytes.toHexString(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()))); - partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue())); ++ //partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue())); - } - while ( results.hasNext() ) ++ final Clause inKey = QueryBuilder.in("key", getPartitionKey(applicationId, type, ++ field.getTypeName().toString(), field.getName(), field.getValue()) ); - final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() ); - { ++ final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES) ++ .where(inKey) ++ .setConsistencyLevel(consistencyLevel); - final Clause inKey = QueryBuilder.in("key", partitionKeys ); - final Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next(); ++ final ResultSet resultSet = session.execute(statement); - final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES) - .where(inKey) - .setConsistencyLevel(consistencyLevel); - final Field field = parseRowKey( unique.getKey() ); - final ResultSet resultSet = session.execute(statement); - final Iterator<Column<EntityVersion>> columnList = unique.getColumns().iterator(); ++ Iterator<com.datastax.driver.core.Row> results = resultSet.iterator(); - //sanity check, nothing to do, skip it - if ( !columnList.hasNext() ) { ++ if( !results.hasNext()){ + if(logger.isTraceEnabled()){ - logger.trace("No cells exist in partition for unique value [{}={}]", - field.getName(), field.getValue().toString()); ++ logger.trace("No rows returned for unique value lookup of field: {}", field); + } - continue; + } - Iterator<com.datastax.driver.core.Row> results = resultSet.iterator(); + List<UniqueValue> candidates = new ArrayList<>(); - while( results.hasNext() ){ - /** - * While iterating the columns, a rule is being enforced to only EVER return the oldest UUID. This means - * the UUID with the oldest timestamp ( it was the original entity written for the unique value ). - * - * We do this to prevent cycling of unique value -> entity UUID mappings as this data is ordered by the - * entity's version and not the entity's timestamp itself. - * - * If newer entity UUIDs are encountered, they are removed from the unique value tables, however their - * backing serialized entity data is left in tact in case a cleanup / audit is later needed. - */ - while (columnList.hasNext()) { ++ while( results.hasNext() ){ + - final com.datastax.driver.core.Row unique = results.next(); - ByteBuffer partitionKey = unique.getBytes("key"); - ByteBuffer column = unique.getBytesUnsafe("column1"); ++ final com.datastax.driver.core.Row unique = results.next(); ++ ByteBuffer partitionKey = unique.getBytes("key"); ++ ByteBuffer column = unique.getBytesUnsafe("column1"); + - List<Object> keyContents = deserializePartitionKey(partitionKey); - List<Object> columnContents = deserializeUniqueValueColumn(column); ++ List<Object> keyContents = deserializePartitionKey(partitionKey); ++ List<Object> columnContents = deserializeUniqueValueColumn(column); + - FieldTypeName fieldType; - String name; - String value; - if(this instanceof UniqueValueSerializationStrategyV2Impl) { ++ FieldTypeName fieldType; ++ String name; ++ String value; ++ if(this instanceof UniqueValueSerializationStrategyV2Impl) { + - fieldType = FieldTypeName.valueOf((String) keyContents.get(3)); - name = (String) keyContents.get(4); - value = (String) keyContents.get(5); + - }else{ ++ fieldType = FieldTypeName.valueOf((String) keyContents.get(3)); ++ name = (String) keyContents.get(4); ++ value = (String) keyContents.get(5); + - fieldType = FieldTypeName.valueOf((String) keyContents.get(5)); - name = (String) keyContents.get(6); - value = (String) keyContents.get(7); ++ }else{ + - } + - Field field = getField(name, value, fieldType); ++ fieldType = FieldTypeName.valueOf((String) keyContents.get(5)); ++ name = (String) keyContents.get(6); ++ value = (String) keyContents.get(7); ++ ++ ++ } ++ ++ Field returnedField = getField(name, value, fieldType); ++ ++ ++ final EntityVersion entityVersion = new EntityVersion( ++ new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0)); ++// //sanity check, nothing to do, skip it ++// if ( !columnList.hasNext() ) { ++// if(logger.isTraceEnabled()){ ++// logger.trace("No cells exist in partition for unique value [{}={}]", ++// field.getName(), field.getValue().toString()); ++// } ++// continue; ++// } ++ ++ ++ ++ ++ /** ++ * While iterating the rows, a rule is enforced to only EVER return the oldest UUID for the field. ++ * This means the UUID with the oldest timestamp ( it was the original entity written for ++ * the unique value ). ++ * ++ * We do this to prevent cycling of unique value -> entity UUID mappings as this data is ordered by the ++ * entity's version and not the entity's timestamp itself. ++ * ++ * If newer entity UUIDs are encountered, they are removed from the unique value tables, however their ++ * backing serialized entity data is left in tact in case a cleanup / audit is later needed. ++ */ + - final EntityVersion entityVersion = columnList.next().getName(); + + final UniqueValue uniqueValue = - new UniqueValueImpl(field, entityVersion.getEntityId(), entityVersion.getEntityVersion()); ++ new UniqueValueImpl(returnedField, entityVersion.getEntityId(), entityVersion.getEntityVersion()); + + // set the initial candidate and move on + if (candidates.size() == 0) { + candidates.add(uniqueValue); + if (logger.isTraceEnabled()) { + logger.trace("First entry for unique value [{}={}] found for application [{}], adding " + + "entry with entity id [{}] and entity version [{}] to the candidate list and continuing", - field.getName(), field.getValue().toString(), applicationId.getType(), ++ returnedField.getName(), returnedField.getValue().toString(), applicationId.getType(), + uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion()); + } - final EntityVersion entityVersion = new EntityVersion( - new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0)); + continue; + } + if(!useReadRepair){ - final UniqueValueImpl uniqueValue = - new UniqueValueImpl( field, entityVersion.getEntityId(), entityVersion.getEntityVersion() ); + // take only the first + if (logger.isTraceEnabled()) { + logger.trace("Read repair not enabled for this request of unique value [{}={}], breaking out" + - " of cell loop", field.getName(), field.getValue().toString()); ++ " of cell loop", returnedField.getName(), returnedField.getValue().toString()); + } + break; + + } else { + + + final int result = uniqueValueComparator.compare(uniqueValue, candidates.get(candidates.size() - 1)); + + if (result == 0) { + + // do nothing, only versions can be newer and we're not worried about newer versions of same entity + if (logger.isTraceEnabled()) { + logger.trace("Current unique value [{}={}] entry has UUID [{}] equal to candidate UUID [{}]", - field.getName(), field.getValue().toString(), uniqueValue.getEntityId().getUuid(), ++ returnedField.getName(), returnedField.getValue().toString(), uniqueValue.getEntityId().getUuid(), + candidates.get(candidates.size() -1)); + } + + // update candidate w/ latest version + candidates.add(uniqueValue); + + } else if (result < 0) { + + // delete the duplicate from the unique value index + candidates.forEach(candidate -> { + - try { - - logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer " + - "entry with entity id [{}] and entity version [{}]", field.getName(), - field.getValue().toString(), applicationId.getUuid(), - candidate.getEntityId().getUuid(), candidate.getEntityVersion()); - - delete(appScope, candidate).execute(); ++ logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer " + ++ "entry with entity id [{}] and entity version [{}]", returnedField.getName(), ++ returnedField.getValue().toString(), applicationId.getUuid(), ++ candidate.getEntityId().getUuid(), candidate.getEntityVersion()); + - } catch (ConnectionException e) { - logger.error( "Unable to connect to cassandra during duplicate repair of [{}={}]", - field.getName(), field.getValue().toString() ); - } ++ session.execute(deleteCQL(appScope, candidate)); + + }); + + // clear the transient candidates list + candidates.clear(); + + if (logger.isTraceEnabled()) { + logger.trace("Updating candidate unique value [{}={}] to entity id [{}] and " + - "entity version [{}]", field.getName(), field.getValue().toString(), ++ "entity version [{}]", returnedField.getName(), returnedField.getValue().toString(), + uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion()); + + } + + // add our new candidate to the list + candidates.add(uniqueValue); + + + } else { + + logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer entry " + - "with entity id [{}] and entity version [{}].", field.getName(), field.getValue().toString(), ++ "with entity id [{}] and entity version [{}].", returnedField.getName(), returnedField.getValue().toString(), + applicationId.getUuid(), uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion()); + + // delete the duplicate from the unique value index - delete(appScope, uniqueValue).execute(); ++ session.execute(deleteCQL(appScope, uniqueValue)); + + + } + + } - } + - // take the last candidate ( should be the latest version) and add to the result set ++ } + - final UniqueValue returnValue = candidates.get(candidates.size() -1); - if(logger.isTraceEnabled()){ - logger.trace("Adding unique value [{}={}] with entity id [{}] and entity version [{}] to response set", - returnValue.getField().getName(), returnValue.getField().getValue().toString(), - returnValue.getEntityId().getUuid(), returnValue.getEntityVersion()); ++ if ( candidates.size() > 0 ) { ++ // take the last candidate ( should be the latest version) and add to the result set ++ final UniqueValue returnValue = candidates.get(candidates.size() - 1); ++ if (logger.isTraceEnabled()) { ++ logger.trace("Adding unique value [{}={}] with entity id [{}] and entity version [{}] to response set", ++ returnValue.getField().getName(), returnValue.getField().getValue().toString(), ++ returnValue.getEntityId().getUuid(), returnValue.getEntityVersion()); ++ } ++ uniqueValueSet.addValue(returnValue); + } - uniqueValueSet.addValue(returnValue); + - uniqueValueSet.addValue(uniqueValue); } ++ return uniqueValueSet; + } @@@ -366,103 -471,101 +523,127 @@@ /** - * Converts raw columns to the expected output + * Get the CQL table definition for the unique values log table */ - private static final class UniqueEntryParser implements ColumnParser<UniqueFieldEntry, UniqueValue> { + protected abstract TableDefinition getEntityUniqueLogTable(); + + public class AllUniqueFieldsIterator implements Iterable<UniqueValue>, Iterator<UniqueValue> { + + private final Session session; + private final Statement query; private final Id entityId; + private Iterator<Row> sourceIterator; - private UniqueEntryParser( final Id entityId ) {this.entityId = entityId;} - @Override - public UniqueValue parseColumn( final Column<UniqueFieldEntry> column ) { - final UniqueFieldEntry entry = column.getName(); + public AllUniqueFieldsIterator( final Session session, final Statement query, final Id entityId){ + + this.session = session; + this.query = query; + this.entityId = entityId; - return new UniqueValueImpl( entry.getField(), entityId, entry.getVersion() ); } - } - @Override - public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { + @Override + public Iterator<UniqueValue> iterator() { + return this; + } - final MultiTenantColumnFamilyDefinition uniqueLookupCF = - new MultiTenantColumnFamilyDefinition( CF_UNIQUE_VALUES, BytesType.class.getSimpleName(), - ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(), - MultiTenantColumnFamilyDefinition.CacheOption.KEYS ); + @Override + public boolean hasNext() { - final MultiTenantColumnFamilyDefinition uniqueLogCF = - new MultiTenantColumnFamilyDefinition( CF_ENTITY_UNIQUE_VALUE_LOG, BytesType.class.getSimpleName(), - ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(), - MultiTenantColumnFamilyDefinition.CacheOption.KEYS ); + if ( sourceIterator == null ) { - return Arrays.asList( uniqueLookupCF, uniqueLogCF ); - } + advanceIterator(); + return sourceIterator.hasNext(); + } - /** - * Get the column family for the unique fields - */ - protected abstract MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion> getUniqueValuesCF(); + return sourceIterator.hasNext(); + } + @Override + public UniqueValue next() { - /** - * Generate a key that is compatible with the column family - * - * @param applicationId The applicationId - * @param type The type in the field - * @param field The field we're creating the key for - */ - protected abstract FieldKey createUniqueValueKey(final Id applicationId, final String type, final Field field ); + com.datastax.driver.core.Row next = sourceIterator.next(); - /** - * Parse the row key into the field - * @param rowKey - * @return - */ - protected abstract Field parseRowKey(final ScopedRowKey<FieldKey> rowKey); + ByteBuffer column = next.getBytesUnsafe("column1"); + List<Object> columnContents = deserializeUniqueValueLogColumn(column); - /** - * Get the column family for the unique field CF - */ - protected abstract MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> getEntityUniqueLogCF(); + UUID version = (UUID) columnContents.get(0); + String name = (String) columnContents.get(1); + String value = (String) columnContents.get(2); + FieldTypeName fieldType = FieldTypeName.valueOf((String) columnContents.get(3)); - /** - * Generate a key that is compatible with the column family - * - * @param applicationId The applicationId - * @param uniqueValueId The uniqueValue - */ - protected abstract EntityKey createEntityUniqueLogKey(final Id applicationId, final Id uniqueValueId ); + return new UniqueValueImpl(getField(name, value, fieldType), entityId, version); + + } + + private void advanceIterator() { + + sourceIterator = session.execute(query).iterator(); + } + } + + private Field getField( String name, String value, FieldTypeName fieldType){ + + Field field = null; + + switch ( fieldType ) { + case BOOLEAN: + field = new BooleanField( name, Boolean.parseBoolean( value ) ); + break; + case DOUBLE: + field = new DoubleField( name, Double.parseDouble( value ) ); + break; + case FLOAT: + field = new FloatField( name, Float.parseFloat( value ) ); + break; + case INTEGER: + field = new IntegerField( name, Integer.parseInt( value ) ); + break; + case LONG: + field = new LongField( name, Long.parseLong( value ) ); + break; + case STRING: + field = new StringField( name, value ); + break; + case UUID: + field = new UUIDField( name, UUID.fromString( value ) ); + break; + } + + return field; + + } + + private class UniqueValueComparator implements Comparator<UniqueValue> { + + @Override + public int compare(UniqueValue o1, UniqueValue o2) { + + if( o1.getEntityId().getUuid().equals(o2.getEntityId().getUuid())){ + + return 0; + + }else if( o1.getEntityId().getUuid().timestamp() < o2.getEntityId().getUuid().timestamp()){ + + return -1; + + } + + // if the UUIDs are not equal and o1's timestamp is not less than o2's timestamp, + // then o1 must be greater than o2 + return 1; + + + } + } + }
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java index 4b5653f,f971b23..61f0f80 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java @@@ -91,10 -113,23 +91,24 @@@ public class UniqueValueSerializationSt return migration.to.load( applicationScope, type, fields ); } + @Override + public UniqueValueSet load( final ApplicationScope applicationScope, final String type, - final Collection<Field> fields, boolean useReadRepair ) throws ConnectionException { ++ final Collection<Field> fields, boolean useReadRepair ) { + + final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip(); + + if ( migration.needsMigration() ) { + return migration.from.load( applicationScope, type, fields, useReadRepair ); + } + + return migration.to.load( applicationScope, type, fields, useReadRepair ); + } + @Override - public UniqueValueSet load(final ApplicationScope applicationScope, final ConsistencyLevel consistencyLevel, - final String type, final Collection<Field> fields, boolean useReadRepair) throws ConnectionException { + public UniqueValueSet load( final ApplicationScope applicationScope, final ConsistencyLevel consistencyLevel, - final String type, final Collection<Field> fields ) { ++ final String type, final Collection<Field> fields, boolean useReadRepair ) { ++ final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java index d305044,6a1cb58..55ba011 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java @@@ -133,280 -114,40 +133,286 @@@ public class UniqueValueSerializationSt @Override - protected CollectionPrefixedKey<Field> createUniqueValueKey( final Id applicationId, - final String type, final Field field) { + protected List<Object> deserializePartitionKey(ByteBuffer bb){ + + + /** + * List<Object> keys = new ArrayList<>(8); + keys.add(0, appUUID); + keys.add(1, applicationType); + keys.add(2, appUUID); + keys.add(3, applicationType); + keys.add(4, entityType); + keys.add(5, fieldType); + keys.add(6, fieldName); + keys.add(7, fieldValueString); + + */ + + int count = 0; + List<Object> stuff = new ArrayList<>(); + while(bb.hasRemaining()){ + ByteBuffer data = CQLUtils.getWithShortLength(bb); + if(count == 0 || count == 2){ + stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + count++; + } + + return stuff; + + } + + @Override + protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){ + + /** + * final UUID version = value.getVersion(); + final Field<?> field = value.getField(); + + final FieldTypeName fieldType = field.getTypeName(); + final String fieldValue = field.getValue().toString().toLowerCase(); + + + DynamicComposite composite = new DynamicComposite( ); + + //we want to sort ascending to descending by version + composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED); + composite.addComponent( field.getName(), STRING_SERIALIZER ); + composite.addComponent( fieldValue, STRING_SERIALIZER ); + composite.addComponent( fieldType.name() , STRING_SERIALIZER); + */ + + // values are serialized as strings, not sure why, and always lower cased + String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase(); + + + List<Object> keys = new ArrayList<>(4); + keys.add(fieldEntry.getVersion()); + keys.add(fieldEntry.getField().getName()); + keys.add(fieldValueString); + keys.add(fieldEntry.getField().getTypeName().name()); + + String comparator = UUID_TYPE_REVERSED; + + int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+ + fieldEntry.getField().getTypeName().name().length(); + + // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality + size += keys.size()*5; + + // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow + size += keys.size()*comparator.length(); + + ByteBuffer stuff = ByteBuffer.allocate(size); + + + for (Object key : keys) { + if(key.equals(fieldEntry.getVersion())) { + int p = comparator.indexOf("(reversed=true)"); + boolean desc = false; + if (p >= 0) { + comparator = comparator.substring(0, p); + desc = true; + } - final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( type ); + byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data + if (desc) { + a = (byte) Character.toUpperCase((char) a); + } + stuff.putShort((short) ('è' | a)); + }else{ + comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here + stuff.putShort((short)comparator.length()); + stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED)); + } - final CollectionPrefixedKey<Field> uniquePrefixedKey = - new CollectionPrefixedKey<>( collectionName, applicationId, field ); + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + // put a short that indicates how big the buffer is for this item + stuff.putShort((short) kb.remaining()); + + // put the actual item + stuff.put(kb.slice()); + + // put an equality byte ( again not used by part of legacy thrift Astyanax schema) + stuff.put((byte) 0); + + + } + + stuff.flip(); + return stuff.duplicate(); - return uniquePrefixedKey; } + @Override + protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){ + + return serializeKey(applicationId.getUuid(), applicationId.getType(), + entityType, fieldType, fieldName, fieldValue); + + } + + @Override + protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){ + + return serializeLogKey(applicationId.getUuid(), applicationId.getType(), + uniqueValueId.getUuid(), uniqueValueId.getType()); + + } @Override - protected Field parseRowKey( final ScopedRowKey<CollectionPrefixedKey<Field>> rowKey ) { - return rowKey.getKey().getSubKey(); + protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){ + + /** + * final Id entityId = ev.getEntityId(); + final UUID entityUuid = entityId.getUuid(); + final String entityType = entityId.getType(); + + CompositeBuilder builder = Composites.newDynamicCompositeBuilder(); + + builder.addUUID( entityVersion ); + builder.addUUID( entityUuid ); + builder.addString(entityType ); + */ + + String comparator = "UTF8Type"; + + List<Object> keys = new ArrayList<>(3); + keys.add(entityVersion.getEntityVersion()); + keys.add(entityVersion.getEntityId().getUuid()); + keys.add(entityVersion.getEntityId().getType()); + + // UUIDs are 16 bytes + int size = 16+16+entityVersion.getEntityId().getType().length(); + + // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality + size += keys.size()*5; + + // we always add comparator to the buffer as well + size += keys.size()*comparator.length(); + + ByteBuffer stuff = ByteBuffer.allocate(size); + + for (Object key : keys) { + + if(key instanceof UUID){ + comparator = "UUIDType"; + }else{ + comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text + } + + stuff.putShort((short)comparator.length()); + stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED)); + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + // put a short that indicates how big the buffer is for this item + stuff.putShort((short) kb.remaining()); + + // put the actual item + stuff.put(kb.slice()); + + // put an equality byte ( again not used by part of legacy thrift Astyanax schema) + stuff.put((byte) 0); + + + } + + stuff.flip(); + return stuff.duplicate(); + } + @Override + protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){ + + List<Object> stuff = new ArrayList<>(); + int count = 0; + while(bb.hasRemaining()){ + - // custom columns have a short at beginning for comparator (which we don't use here ) - ByteBuffer comparator = CQLUtils.getWithShortLength(bb); ++ // pull of custom comparator (per Astyanax deserialize) ++ int e = CQLUtils.getShortLength(bb); ++ if((e & 'è') == 0) { ++ CQLUtils.getBytes(bb, e); ++ } else { ++ // do nothing ++ } + + ByteBuffer data = CQLUtils.getWithShortLength(bb); + + + // first two composites are UUIDs, rest are strings + if(count == 0) { + stuff.add(new UUID(data.getLong(), data.getLong())); + }else if(count ==1){ + stuff.add(new UUID(data.getLong(), data.getLong())); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + count++; + } + + return stuff; + + } @Override - protected CollectionPrefixedKey<Id> createEntityUniqueLogKey( final Id applicationId, - final Id uniqueValueId ) { + protected List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb){ + + + /** + * List<Object> keys = new ArrayList<>(4); + keys.add(fieldEntry.getVersion()); + keys.add(fieldEntry.getField().getName()); + keys.add(fieldValueString); + keys.add(fieldEntry.getField().getTypeName().name()); + */ + List<Object> stuff = new ArrayList<>(); + int count = 0; + while(bb.hasRemaining()){ - // the comparator info is different for the UUID reversed type vs. UTF8 type - if(count ==0){ - bb.getShort(); // take the reversed comparator byte off - }else { - ByteBuffer comparator = CQLUtils.getWithShortLength(bb); - final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( uniqueValueId.getType() ); ++ // pull of custom comparator (per Astyanax deserialize) ++ int e = CQLUtils.getShortLength(bb); ++ if((e & 'è') == 0) { ++ CQLUtils.getBytes(bb, e); ++ } else { ++ // do nothing + } + ByteBuffer data = CQLUtils.getWithShortLength(bb); - final CollectionPrefixedKey<Id> collectionPrefixedEntityKey = - new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId ); + // first composite is a UUID, rest are strings + if(count == 0) { + stuff.add(new UUID(data.getLong(), data.getLong())); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + count++; + } + + return stuff; - return collectionPrefixedEntityKey; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java index 6ea5c1e,40622a4..92c0a5b --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java @@@ -20,20 -20,21 +20,21 @@@ package org.apache.usergrid.persistence.collection.serialization.impl; -import java.util.Arrays; -import java.util.Collection; +import java.nio.ByteBuffer; +import java.util.*; -import org.apache.cassandra.db.marshal.BytesType; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.Session; ++import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; -import org.apache.usergrid.persistence.core.astyanax.CassandraFig; -import org.apache.usergrid.persistence.core.astyanax.ColumnTypes; -import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer; -import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily; +import org.apache.usergrid.persistence.core.CassandraConfig; +import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; -import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; -import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer; +import org.apache.usergrid.persistence.core.datastax.CQLUtils; +import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.field.Field; import com.google.inject.Inject; import com.google.inject.Singleton; @@@ -124,277 -114,20 +125,284 @@@ public class UniqueValueSerializationSt @Override - protected TypeField createUniqueValueKey( final Id applicationId, final String type, final Field field) { - return new TypeField(type,field); + protected List<Object> deserializePartitionKey(ByteBuffer bb){ + + + /** + * List<Object> keys = new ArrayList<>(6); + keys.add(0, appUUID); // UUID + keys.add(1, applicationType); // String + keys.add(2, entityType); // String + keys.add(3, fieldType); // String + keys.add(4, fieldName); // String + keys.add(5, fieldValueString); // String + + */ + + List<Object> stuff = new ArrayList<>(); + while(bb.hasRemaining()){ + ByteBuffer data = CQLUtils.getWithShortLength(bb); + if(stuff.size() == 0){ + stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + } + + return stuff; + } + @Override + protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){ + + /** + * final UUID version = value.getVersion(); + final Field<?> field = value.getField(); + + final FieldTypeName fieldType = field.getTypeName(); + final String fieldValue = field.getValue().toString().toLowerCase(); + + + DynamicComposite composite = new DynamicComposite( ); + + //we want to sort ascending to descending by version + composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED); + composite.addComponent( field.getName(), STRING_SERIALIZER ); + composite.addComponent( fieldValue, STRING_SERIALIZER ); + composite.addComponent( fieldType.name() , STRING_SERIALIZER); + */ + + // values are serialized as strings, not sure why, and always lower cased + String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase(); + + + List<Object> keys = new ArrayList<>(4); + keys.add(fieldEntry.getVersion()); + keys.add(fieldEntry.getField().getName()); + keys.add(fieldValueString); + keys.add(fieldEntry.getField().getTypeName().name()); + + String comparator = UUID_TYPE_REVERSED; + + int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+ + fieldEntry.getField().getTypeName().name().length(); + + // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality + size += keys.size()*5; + + // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow + size += keys.size()*comparator.length(); + + ByteBuffer stuff = ByteBuffer.allocate(size); + + + for (Object key : keys) { + + if(key.equals(fieldEntry.getVersion())) { + int p = comparator.indexOf("(reversed=true)"); + boolean desc = false; + if (p >= 0) { + comparator = comparator.substring(0, p); + desc = true; + } + + byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data + if (desc) { + a = (byte) Character.toUpperCase((char) a); + } + + stuff.putShort((short) ('è' | a)); + }else{ + comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here + stuff.putShort((short)comparator.length()); + stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED)); + } + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + // put a short that indicates how big the buffer is for this item + stuff.putShort((short) kb.remaining()); + + // put the actual item + stuff.put(kb.slice()); + + // put an equality byte ( again not used by part of legacy thrift Astyanax schema) + stuff.put((byte) 0); + + + } + + stuff.flip(); + return stuff.duplicate(); + + } @Override - protected Field parseRowKey( final ScopedRowKey<TypeField> rowKey ) { - return rowKey.getKey().getField(); + protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){ + + return serializeKey(applicationId.getUuid(), applicationId.getType(), + entityType, fieldType, fieldName, fieldValue); + + } + + @Override + protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){ + + return serializeLogKey(applicationId.getUuid(), applicationId.getType(), + uniqueValueId.getUuid(), uniqueValueId.getType()); + + } + + @Override + protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){ + + /** + * final Id entityId = ev.getEntityId(); + final UUID entityUuid = entityId.getUuid(); + final String entityType = entityId.getType(); + + CompositeBuilder builder = Composites.newDynamicCompositeBuilder(); + + builder.addUUID( entityVersion ); + builder.addUUID( entityUuid ); + builder.addString(entityType ); + */ + + String comparator = "UTF8Type"; + + List<Object> keys = new ArrayList<>(3); + keys.add(entityVersion.getEntityVersion()); + keys.add(entityVersion.getEntityId().getUuid()); + keys.add(entityVersion.getEntityId().getType()); + + // UUIDs are 16 bytes + int size = 16+16+entityVersion.getEntityId().getType().length(); + + // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality + size += keys.size()*5; + + // we always add comparator to the buffer as well + size += keys.size()*comparator.length(); + + ByteBuffer stuff = ByteBuffer.allocate(size); + + for (Object key : keys) { + ++ // custom comparator mappings in CQLUtils.COMPOSITE_TYPE ( more leftover from Asytanax ) + if(key instanceof UUID){ + comparator = "UUIDType"; + }else{ + comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text + } + + stuff.putShort((short)comparator.length()); + stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED)); + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + // put a short that indicates how big the buffer is for this item + stuff.putShort((short) kb.remaining()); + + // put the actual item + stuff.put(kb.slice()); + + // put an equality byte ( again not used by part of legacy thrift Astyanax schema) + stuff.put((byte) 0); + + + } + + stuff.flip(); + return stuff.duplicate(); + } + @Override + protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){ + + List<Object> stuff = new ArrayList<>(); + int count = 0; + while(bb.hasRemaining()){ + - // custom columns have a short at beginning for comparator (which we don't use here ) - ByteBuffer comparator = CQLUtils.getWithShortLength(bb); ++ // pull of custom comparator (per Astyanax deserialize) ++ int e = CQLUtils.getShortLength(bb); ++ if((e & 'è') == 0) { ++ CQLUtils.getBytes(bb, e); ++ } else { ++ // do nothing ++ } ++ + + ByteBuffer data = CQLUtils.getWithShortLength(bb); + + + // first two composites are UUIDs, rest are strings + if(count == 0) { + stuff.add(new UUID(data.getLong(), data.getLong())); + }else if(count ==1){ + stuff.add(new UUID(data.getLong(), data.getLong())); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + count++; + } + + return stuff; + + } @Override - protected Id createEntityUniqueLogKey( final Id applicationId, final Id uniqueValueId ) { - return uniqueValueId; + protected List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb){ + + + /** + * List<Object> keys = new ArrayList<>(4); + keys.add(fieldEntry.getVersion()); + keys.add(fieldEntry.getField().getName()); + keys.add(fieldValueString); + keys.add(fieldEntry.getField().getTypeName().name()); + */ + + List<Object> stuff = new ArrayList<>(); + int count = 0; + while(bb.hasRemaining()){ + - // the comparator info is different for the UUID reversed type vs. UTF8 type - if(count ==0){ - bb.getShort(); // take the reversed comparator byte off - }else { - ByteBuffer comparator = CQLUtils.getWithShortLength(bb); ++ int e = CQLUtils.getShortLength(bb); ++ if((e & 'è') == 0) { ++ CQLUtils.getBytes(bb, e); ++ } else { ++ // do nothing + } + + ByteBuffer data = CQLUtils.getWithShortLength(bb); + + + // first composite is a UUID, rest are strings + if(count == 0) { + stuff.add(new UUID(data.getLong(), data.getLong())); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + count++; + } + + return stuff; + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java index 0000000,2cad32c..ed88ba6 mode 000000,100644..100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java @@@ -1,0 -1,94 +1,101 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.usergrid.persistence.collection.uniquevalues; + ++import com.datastax.driver.core.BatchStatement; ++import com.datastax.driver.core.Session; ++import com.datastax.driver.core.querybuilder.Batch; + import com.google.inject.Inject; + import com.google.inject.Singleton; + import com.netflix.astyanax.MutationBatch; + import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; + import org.apache.usergrid.persistence.collection.serialization.UniqueValue; + import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; + import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; + import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl; + import org.apache.usergrid.persistence.core.scope.ApplicationScope; + import org.apache.usergrid.persistence.model.entity.Id; + import org.apache.usergrid.persistence.model.field.Field; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.util.Collection; + import java.util.Collections; + import java.util.Iterator; + import java.util.UUID; + + + @Singleton + public class UniqueValuesTableImpl implements UniqueValuesTable { + private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class ); + - final UniqueValueSerializationStrategy strat; - final UniqueValuesFig uniqueValuesFig; ++ private final UniqueValueSerializationStrategy strat; ++ private final UniqueValuesFig uniqueValuesFig; ++ private final Session session; + + @Inject - public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, UniqueValuesFig uniqueValuesFig) { ++ public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, ++ final UniqueValuesFig uniqueValuesFig, ++ final Session session ) { + this.strat = strat; + this.uniqueValuesFig = uniqueValuesFig; ++ this.session = session; + } + + + @Override + public Id lookupOwner( ApplicationScope scope, String type, Field field) throws ConnectionException { + + UniqueValueSet set = strat.load( scope, type, Collections.singletonList( field ) ); + UniqueValue uv = set.getValue( field.getName() ); + return uv == null ? null : uv.getEntityId(); + } + + @Override + public void reserve( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException { + + UniqueValue uv = new UniqueValueImpl( field, owner, version); - final MutationBatch write = strat.write( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() ); - write.execute(); ++ final BatchStatement statement = strat.writeCQL( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() ); ++ session.execute(statement); + } + + @Override + public void confirm( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException { + + UniqueValue uv = new UniqueValueImpl( field, owner, version); - final MutationBatch write = strat.write( scope, uv ); - write.execute(); ++ final BatchStatement statement = strat.writeCQL( scope, uv, -1 ); ++ session.execute(statement); + + } + + @Override + public void cancel( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException { + + UniqueValue uv = new UniqueValueImpl( field, owner, version ); - final MutationBatch write = strat.delete( scope, uv ); - write.execute(); ++ final BatchStatement statement = strat.deleteCQL( scope, uv ); ++ session.execute(statement); + } + + @Override + public Iterator<UniqueValue> getUniqueValues(ApplicationScope scope, Id entityId) { + return strat.getAllUniqueFields( scope, entityId ); + } + + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java index b18b095,89169ac..f98a3ea --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java @@@ -82,13 -71,12 +82,14 @@@ public class MarkCommitTest extends Abs //run the stage - WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session ); - WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null); ++ WriteCommit newStage ++ = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session); + - //verify the observable is correct - Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get(); - + //verify the observable is correct + Entity result = newStage.call( + new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get(); //verify the log entry is correct http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java index 60281d4,dcc473c..df0fc9e --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java @@@ -92,10 -84,12 +92,13 @@@ public class WriteCommitTest extends Ab //run the stage - WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session ); + WriteCommit newStage = - new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null ); ++ new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session ); + - Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get(); + + Entity result = newStage.call( + new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get(); //verify the log entry is correct @@@ -142,7 -133,7 +145,8 @@@ when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) ) .thenReturn( entityMutation ); - new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session ).call( event ); - new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null ).call( event ); ++ new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session ).call( event ); ++ } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java index 9d0cd20,401d23e..87226be --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java @@@ -18,12 -18,9 +18,10 @@@ package org.apache.usergrid.persistence.collection.mvcc.stage.write; - import org.junit.Rule; - import org.junit.Test; - import org.junit.runner.RunWith; - - import org.apache.usergrid.persistence.collection.EntityCollectionManager; - import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; ++import com.datastax.driver.core.Session; + import com.google.inject.Inject; + import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; + import org.apache.usergrid.persistence.collection.*; import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; import org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator; @@@ -60,8 -70,25 +71,28 @@@ public class WriteUniqueVerifyIT extend public MigrationManagerRule migrationManagerRule; @Inject + public UniqueValueSerializationStrategy uniqueValueSerializationStrategy; + + @Inject public EntityCollectionManagerFactory cmf; + @Inject + ActorSystemManager actorSystemManager; + + @Inject + UniqueValuesService uniqueValuesService; + ++ @Inject ++ Session session; ++ + + @Before + public void initAkka() { + // each test class needs unique port number + initAkka( 2553, actorSystemManager, uniqueValuesService ); + } + + @Test public void testConflict() { @@@ -137,9 -162,69 +166,69 @@@ entity.setField(new StringField("name", "Alfa Romeo 8C Competizione", true)); entity.setField(new StringField("identifier", "ar8c", true)); entity.setField(new IntegerField("top_speed_mph", 182)); - entityManager.write( entity ).toBlocking().last(); + entityManager.write( entity, null ).toBlocking().last(); entity.setField( new StringField("foo", "bar")); - entityManager.write( entity ).toBlocking().last(); + entityManager.write( entity, null ).toBlocking().last(); + } + + @Test + public void testConflictReadRepair() throws Exception { + + final Id appId = new SimpleId("testNoConflict"); + + + + final ApplicationScope scope = new ApplicationScopeImpl( appId); + + final EntityCollectionManager entityManager = cmf.createCollectionManager( scope ); + + final Entity entity = TestEntityGenerator.generateEntity(); + entity.setField(new StringField("name", "Porsche 911 GT3", true)); + entity.setField(new StringField("identifier", "911gt3", true)); + entity.setField(new IntegerField("top_speed_mph", 194)); + entityManager.write( entity, null ).toBlocking().last(); + + + FieldSet fieldSet = + entityManager.getEntitiesFromFields("test", Collections.singletonList(entity.getField("name")), true) + .toBlocking().last(); + + MvccEntity entityFetched = fieldSet.getEntity( entity.getField("name") ); + + + final Entity entityDuplicate = TestEntityGenerator.generateEntity(); + UniqueValue uniqueValue = new UniqueValueImpl(new StringField("name", "Porsche 911 GT3", true), + entityDuplicate.getId(), UUIDGenerator.newTimeUUID()); + + // manually insert a record to simulate a 'duplicate' trying to be inserted - uniqueValueSerializationStrategy. - write(scope, uniqueValue).execute(); ++ session.execute(uniqueValueSerializationStrategy. ++ writeCQL(scope, uniqueValue, -1)); + + + + FieldSet fieldSetAgain = + entityManager.getEntitiesFromFields("test", Collections.singletonList(entity.getField("name")), true) + .toBlocking().last(); + + MvccEntity entityFetchedAgain = fieldSetAgain.getEntity( entity.getField("name") ); + + assertEquals(entityFetched, entityFetchedAgain); + + + // now test writing the original entity again ( simulates a PUT ) + // this should read repair and work + entityManager.write( entity, null ).toBlocking().last(); + + FieldSet fieldSetAgainAgain = + entityManager.getEntitiesFromFields("test", Collections.singletonList(entity.getField("name")), true) + .toBlocking().last(); + + MvccEntity entityFetchedAgainAgain = fieldSetAgainAgain.getEntity( entity.getField("name") ); + + assertEquals(entityFetched, entityFetchedAgainAgain); + + + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java ---------------------------------------------------------------------- diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java index 3ddc14d,7afba05..1290a5c --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java @@@ -18,17 -18,20 +18,27 @@@ package org.apache.usergrid.persistence.collection.mvcc.stage.write; ++ +import com.datastax.driver.core.Session; - import org.junit.Rule; - import org.junit.Test; - import org.junit.runner.RunWith; ++ + import com.google.inject.Inject; + import com.netflix.astyanax.Keyspace; + import com.netflix.astyanax.MutationBatch; + import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; + import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; + import org.apache.usergrid.persistence.collection.AbstractUniqueValueTest; + import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; + import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.guice.TestCollectionModule; import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; ++ +import org.apache.usergrid.persistence.core.CassandraConfig; ++ + import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; -import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; ++ import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.test.ITRunner; @@@ -84,12 -96,11 +106,12 @@@ public class WriteUniqueVerifyTest exte final MvccEntity mvccEntity = fromEntity( entity ); // run the stage - WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig, session ); - WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null, null ); ++ WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null, null, session ); ++ - newStage.call( - new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ; + newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ; - //if we get here, it's a success. We want to test no exceptions are thrown + // if we get here, it's a success. We want to test no exceptions are thrown verify(batch, never()).execute(); }
