Re-write SCOPED_CACHE serialization to use Datastax driver and CQL.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4475158f Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4475158f Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4475158f Branch: refs/heads/master Commit: 4475158f1b48aa485d6af7d90caa91948ac2f46f Parents: 6a1fd22 Author: Michael Russo <[email protected]> Authored: Sun May 1 16:56:56 2016 +0800 Committer: Michael Russo <[email protected]> Committed: Sun May 1 16:56:56 2016 +0800 ---------------------------------------------------------------------- .../impl/ScopedCacheSerializationImpl.java | 311 ++++++++++--------- 1 file changed, 170 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/4475158f/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java index 1646d36..ffa5f1f 100644 --- a/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java +++ b/stack/corepersistence/cache/src/main/java/org/apache/usergrid/persistence/cache/impl/ScopedCacheSerializationImpl.java @@ -16,10 +16,16 @@ */ package org.apache.usergrid.persistence.cache.impl; +import com.datastax.driver.core.*; +import com.datastax.driver.core.querybuilder.Clause; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Using; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.hash.Funnel; @@ -39,6 +45,7 @@ import com.netflix.astyanax.serializers.StringSerializer; import org.apache.cassandra.db.marshal.BytesType; import org.apache.usergrid.persistence.cache.CacheScope; import org.apache.usergrid.persistence.core.astyanax.*; +import org.apache.usergrid.persistence.core.datastax.CQLUtils; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator; import org.apache.usergrid.persistence.core.shard.StringHashUtils; @@ -46,9 +53,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; +import java.nio.ByteBuffer; +import java.util.*; import java.util.concurrent.Callable; @@ -57,55 +63,50 @@ import java.util.concurrent.Callable; */ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerialization<K,V> { - // row-keys are application ID + consistent hash key - // column names are K key toString() - // column values are serialization of V value - public static final Logger logger = LoggerFactory.getLogger(ScopedCacheSerializationImpl.class); + // row-keys are (app UUID, application type, app UUID as string, consistent hash int as bucket number) + // column names are K key toString() + // column values are serialization of V value - private static final CacheRowKeySerializer ROWKEY_SERIALIZER = new CacheRowKeySerializer(); - - private static final BucketScopedRowKeySerializer<String> BUCKET_ROWKEY_SERIALIZER = - new BucketScopedRowKeySerializer<>( ROWKEY_SERIALIZER ); - - private static final Serializer<String> COLUMN_NAME_SERIALIZER = StringSerializer.get(); + private static final String SCOPED_CACHE_TABLE = CQLUtils.quote("SCOPED_CACHE"); + private static final Collection<String> SCOPED_CACHE_PARTITION_KEYS = Collections.singletonList("key"); + private static final Collection<String> SCOPED_CACHE_COLUMN_KEYS = Collections.singletonList("column1"); + private static final Map<String, DataType.Name> SCOPED_CACHE_COLUMNS = + new HashMap<String, DataType.Name>() {{ + put( "key", DataType.Name.BLOB ); + put( "column1", DataType.Name.BLOB ); + put( "value", DataType.Name.BLOB ); }}; + private static final Map<String, String> SCOPED_CACHE_CLUSTERING_ORDER = + new HashMap<String, String>(){{ put( "column1", "ASC" ); }}; - private static final ObjectSerializer COLUMN_VALUE_SERIALIZER = ObjectSerializer.get(); - public static final MultiTenantColumnFamily<BucketScopedRowKey<String>, String> SCOPED_CACHE - = new MultiTenantColumnFamily<>( "SCOPED_CACHE", - BUCKET_ROWKEY_SERIALIZER, COLUMN_NAME_SERIALIZER, COLUMN_VALUE_SERIALIZER ); /** Number of buckets to hash across */ private static final int[] NUM_BUCKETS = {20}; /** How to funnel keys for buckets */ - private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() { - - @Override - public void funnel( final String key, final PrimitiveSink into ) { - into.putString(key, StringHashUtils.UTF8); - } - }; + private static final Funnel<String> MAP_KEY_FUNNEL = + (Funnel<String>) (key, into) -> into.putString(key, StringHashUtils.UTF8); - /** - * Locator to get us all buckets - */ + /** Locator to get us all buckets */ private static final ExpandingShardLocator<String> BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS); - private final Keyspace keyspace; + private final Session session; + private final CassandraConfig cassandraConfig; private final ObjectMapper MAPPER = new ObjectMapper(); - //------------------------------------------------------------------------------------------ + @Inject - public ScopedCacheSerializationImpl( final Keyspace keyspace ) { - this.keyspace = keyspace; - //MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + public ScopedCacheSerializationImpl( final Session session, + final CassandraConfig cassandraConfig ) { + this.session = session; + this.cassandraConfig = cassandraConfig; + MAPPER.enableDefaultTyping(); MAPPER.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE); MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); @@ -115,77 +116,76 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati @Override public V readValue(CacheScope scope, K key, TypeReference typeRef ) { + return readValueCQL( scope, key, typeRef); + + } + + + private V readValueCQL(CacheScope scope, K key, TypeReference typeRef){ + Preconditions.checkNotNull(scope, "scope is required"); Preconditions.checkNotNull(key, "key is required"); - // determine bucketed row-key based application UUID - String rowKeyString = scope.getApplication().getUuid().toString(); + final String rowKeyString = scope.getApplication().getUuid().toString(); final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString); - final BucketScopedRowKey<String> keyRowKey = - BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket); // determine column name based on K key to string - String columnName = key.toString(); + final String columnName = key.toString(); - try { - try { - Column<String> result = keyspace.prepareQuery(SCOPED_CACHE) - .getKey(keyRowKey).getColumn( columnName ).execute().getResult(); - - result.getByteBufferValue(); - //V value = MAPPER.readValue(result.getByteArrayValue(), new TypeReference<V>() {}); - V value = MAPPER.readValue(result.getByteArrayValue(), typeRef); - - logger.debug("Read cache item from scope {}\n key/value types {}/{}\n key:value: {}:{}", - scope.getApplication().getUuid(), - key.getClass().getSimpleName(), - value.getClass().getSimpleName(), - key, - value); - - return value; - - } catch (NotFoundException nfe) { - if(logger.isDebugEnabled()) { - logger.debug("Value not found"); - } - - } catch (IOException ioe) { - logger.error("Unable to read cached value", ioe); - throw new RuntimeException("Unable to read cached value", ioe); + final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) ); + final Clause inColumn = QueryBuilder.eq("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED) ); + + final Statement statement = QueryBuilder.select().all().from(SCOPED_CACHE_TABLE) + .where(inKey) + .and(inColumn) + .setConsistencyLevel(cassandraConfig.getDataStaxReadCl()); + + final ResultSet resultSet = session.execute(statement); + final com.datastax.driver.core.Row row = resultSet.one(); + + if (row == null){ + + if(logger.isDebugEnabled()){ + logger.debug("Cache value not found for key {}", key ); } - } catch (ConnectionException e) { - throw new RuntimeException("Unable to connect to cassandra", e); + return null; } - if(logger.isDebugEnabled()){ - logger.debug("Cache value not found for key {}", key ); + try { + + return MAPPER.readValue(row.getBytes("value").array(), typeRef); + + } catch (IOException ioe) { + logger.error("Unable to read cached value", ioe); + throw new RuntimeException("Unable to read cached value", ioe); } - return null; + } @Override public V writeValue(CacheScope scope, K key, V value, Integer ttl) { + return writeValueCQL( scope, key, value, ttl); + + } + + private V writeValueCQL(CacheScope scope, K key, V value, Integer ttl) { + Preconditions.checkNotNull( scope, "scope is required"); Preconditions.checkNotNull( key, "key is required" ); Preconditions.checkNotNull( value, "value is required"); Preconditions.checkNotNull( ttl, "ttl is required"); - // determine bucketed row-key based application UUID - String rowKeyString = scope.getApplication().getUuid().toString(); + final String rowKeyString = scope.getApplication().getUuid().toString(); final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString); - final BucketScopedRowKey<String> keyRowKey = - BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket); - // determine column name based on K key to string - String columnName = key.toString(); + final String columnName = key.toString(); // serialize cache item byte[] cacheBytes; @@ -195,127 +195,156 @@ public class ScopedCacheSerializationImpl<K,V> implements ScopedCacheSerializati throw new RuntimeException("Unable to serialize cache value", jpe); } - // serialize to the entry - final MutationBatch batch = keyspace.prepareMutationBatch(); - batch.withRow(SCOPED_CACHE, keyRowKey).putColumn(columnName, cacheBytes, ttl); + final Using timeToLive = QueryBuilder.ttl(ttl); + + + // convert to ByteBuffer for the blob DataType in Cassandra + final ByteBuffer bb = ByteBuffer.allocate(cacheBytes.length); + bb.put(cacheBytes); + bb.flip(); + + final Statement cacheEntry = QueryBuilder.insertInto(SCOPED_CACHE_TABLE) + .using(timeToLive) + .value("key", getPartitionKey(scope, rowKeyString, bucket)) + .value("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", bb); + - executeBatch(batch); + session.execute(cacheEntry); logger.debug("Wrote cache item to scope {}\n key/value types {}/{}\n key:value: {}:{}", - scope.getApplication().getUuid(), - key.getClass().getSimpleName(), - value.getClass().getSimpleName(), - key, - value); + scope.getApplication().getUuid(), + key.getClass().getSimpleName(), + value.getClass().getSimpleName(), + key, + value); return value; + } + @Override public void removeValue(CacheScope scope, K key) { + removeValueCQL(scope, key); + + } + + + private void removeValueCQL(CacheScope scope, K key) { + Preconditions.checkNotNull( scope, "scope is required"); Preconditions.checkNotNull( key, "key is required" ); // determine bucketed row-key based application UUID - String rowKeyString = scope.getApplication().getUuid().toString(); + final String rowKeyString = scope.getApplication().getUuid().toString(); final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString); - final BucketScopedRowKey<String> keyRowKey = - BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket); - // determine column name based on K key to string - String columnName = key.toString(); + final String columnName = key.toString(); - final MutationBatch batch = keyspace.prepareMutationBatch(); - batch.withRow(SCOPED_CACHE, keyRowKey).deleteColumn(columnName); - executeBatch(batch); - } + final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) ); + final Clause inColumn = QueryBuilder.eq("column1", DataType.text().serialize(columnName, ProtocolVersion.NEWEST_SUPPORTED) ); + + final Statement statement = QueryBuilder.delete().from(SCOPED_CACHE_TABLE) + .where(inKey) + .and(inColumn); + session.execute(statement); + } + + @Override public void invalidate(CacheScope scope) { + invalidateCQL(scope); + logger.debug("Invalidated scope {}", scope.getApplication().getUuid()); + + } + + private void invalidateCQL(CacheScope scope){ + Preconditions.checkNotNull(scope, "scope is required"); // determine bucketed row-key based application UUID - String rowKeyString = scope.getApplication().getUuid().toString(); + final String rowKeyString = scope.getApplication().getUuid().toString(); final int bucket = BUCKET_LOCATOR.getCurrentBucket(rowKeyString); - final BucketScopedRowKey<String> keyRowKey = - BucketScopedRowKey.fromKey(scope.getApplication(), rowKeyString, bucket); - final MutationBatch batch = keyspace.prepareMutationBatch(); + final Clause inKey = QueryBuilder.eq("key", getPartitionKey(scope, rowKeyString, bucket) ); - batch.withRow(SCOPED_CACHE, keyRowKey).delete(); + final Statement statement = QueryBuilder.delete().from(SCOPED_CACHE_TABLE) + .where(inKey); - final OperationResult<Void> result = executeBatch(batch); + session.execute(statement); - logger.debug("Invalidated scope {}", scope.getApplication().getUuid()); } + @Override + public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { - private class MutationBatchExec implements Callable<Void> { - private final MutationBatch myBatch; - private MutationBatchExec(MutationBatch batch) { - myBatch = batch; - } - @Override - public Void call() throws Exception { - myBatch.execute(); - return null; - } + return Collections.emptyList(); } + @Override + public Collection<TableDefinition> getTables() { - private OperationResult<Void> executeBatch(MutationBatch batch) { - try { - return batch.execute(); + final TableDefinition scopedCache = + new TableDefinition( SCOPED_CACHE_TABLE, SCOPED_CACHE_PARTITION_KEYS, SCOPED_CACHE_COLUMN_KEYS, + SCOPED_CACHE_COLUMNS, TableDefinition.CacheOption.KEYS, SCOPED_CACHE_CLUSTERING_ORDER); - } catch (ConnectionException e) { - throw new RuntimeException("Unable to connect to cassandra", e); - } + return Collections.singletonList(scopedCache); } - //------------------------------------------------------------------------------------------ - @Override - public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() { - final MultiTenantColumnFamilyDefinition scopedCache = - new MultiTenantColumnFamilyDefinition( SCOPED_CACHE, - BytesType.class.getSimpleName(), - BytesType.class.getSimpleName(), - BytesType.class.getSimpleName(), - MultiTenantColumnFamilyDefinition.CacheOption.KEYS ); + private ByteBuffer getPartitionKey(CacheScope scope, String key, int bucketNumber){ + + return serializeKeys(scope.getApplication().getUuid(), + scope.getApplication().getType(), bucketNumber, key); - return Collections.singletonList(scopedCache); } - @Override - public Collection<TableDefinition> getTables() { + private static ByteBuffer serializeKeys(UUID ownerUUID, String ownerType, int bucketNumber, String rowKeyString ){ - return Collections.emptyList(); - } + List<Object> keys = new ArrayList<>(4); + keys.add(0, ownerUUID); + keys.add(1, ownerType); + keys.add(2, bucketNumber); + keys.add(3, rowKeyString); + // UUIDs are 16 bytes, allocate the buffer accordingly + int size = 16+ownerType.length()+rowKeyString.length(); - /** - * Inner class to serialize cache key - */ - private static class CacheRowKeySerializer implements CompositeFieldSerializer<String> { + // ints are 4 bytes, add for the bucket + size += 4; + + + // we always need to add length for the 2 byte short and 1 byte equality + size += keys.size()*3; + + ByteBuffer stuff = ByteBuffer.allocate(size); + + for (Object key : keys) { + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + stuff.putShort((short) kb.remaining()); + stuff.put(kb.slice()); + stuff.put((byte) 0); - @Override - public void toComposite( final CompositeBuilder builder, final String key ) { - builder.addString(key); - } - @Override - public String fromComposite( final CompositeParser composite ) { - final String key = composite.readString(); - return key; } + stuff.flip(); + return stuff.duplicate(); + } }
