Convert map manager serialization to read and write using CQL using the original Astyanax data modeling in Cassandra (aka no migration required).
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2203f433 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2203f433 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2203f433 Branch: refs/heads/master Commit: 2203f43336ae86ebcad823fb740207b4eecc36bb Parents: ad4a337 Author: Michael Russo <[email protected]> Authored: Thu Feb 11 14:49:44 2016 -0800 Committer: Michael Russo <[email protected]> Committed: Thu Feb 11 14:49:44 2016 -0800 ---------------------------------------------------------------------- .../core/astyanax/CassandraConfig.java | 19 + .../core/astyanax/CassandraConfigImpl.java | 27 ++ .../persistence/core/datastax/CQLUtils.java | 77 +++- .../core/datastax/TableDefinition.java | 22 +- .../core/astyanax/ColumnNameIteratorTest.java | 16 + .../MultiKeyColumnNameIteratorTest.java | 16 + .../astyanax/MultiRowColumnIteratorTest.java | 16 + .../persistence/core/datastax/CQLUtilsTest.java | 15 +- .../core/datastax/TableDefinitionTest.java | 8 +- .../persistence/map/impl/MapManagerImpl.java | 1 - .../map/impl/MapSerializationImpl.java | 391 ++++++++++--------- .../persistence/map/MapManagerTest.java | 16 + 12 files changed, 407 insertions(+), 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java index c506d2d..8cb96ac 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java @@ -48,6 +48,25 @@ public interface CassandraConfig { */ ConsistencyLevel getWriteCL(); + + /** + * Get the currently configured read CL for DataStax driver + * @return + */ + com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl(); + + /** + * Get the currently configured write CL for DataStax driver + * @return + */ + com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl(); + + /** + * Get the currently configured consistent read CL for DataStax driver + * @return + */ + com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl(); + /** * Return the number of shards that has been set in the property file * @return http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java index a9b37fd..15f434c 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java @@ -41,6 +41,12 @@ public class CassandraConfigImpl implements CassandraConfig { private int[] shardSettings; private ConsistencyLevel consistentCl; + // DataStax driver's CL + private com.datastax.driver.core.ConsistencyLevel dataStaxReadCl; + private com.datastax.driver.core.ConsistencyLevel dataStaxWriteCl; + private com.datastax.driver.core.ConsistencyLevel dataStaxReadConsistentCl; + + @Inject public CassandraConfigImpl( final CassandraFig cassandraFig ) { @@ -53,6 +59,12 @@ public class CassandraConfigImpl implements CassandraConfig { this.consistentCl = ConsistencyLevel.valueOf(cassandraFig.getAstyanaxConsistentReadCL()); + this.dataStaxReadCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadCl()); + + this.dataStaxReadConsistentCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadCl()); + + this.dataStaxWriteCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getWriteCl() ); + //add the listeners to update the values cassandraFig.addPropertyChangeListener( new PropertyChangeListener() { @Override @@ -89,6 +101,21 @@ public class CassandraConfigImpl implements CassandraConfig { return writeCl; } + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() { + return dataStaxReadCl; + } + + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() { + return dataStaxWriteCl; + } + + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() { + return dataStaxReadConsistentCl; + } + @Override public int[] getShardSettings() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java index 0a7408a..7dee9c8 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java @@ -18,14 +18,14 @@ */ package org.apache.usergrid.persistence.core.datastax; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ProtocolVersion; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.usergrid.persistence.core.util.StringUtils; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.StringJoiner; +import java.nio.ByteBuffer; +import java.util.*; public class CQLUtils { @@ -45,6 +45,10 @@ public class CQLUtils { static String PRIMARY_KEY = "PRIMARY KEY"; static String COMPACT_STORAGE = "COMPACT STORAGE"; static String CLUSTERING_ORDER_BY = "CLUSTERING ORDER BY"; + static String COMMA = ","; + static String PAREN_LEFT = "("; + static String PAREN_RIGHT = ")"; + private final static ObjectMapper mapper = new ObjectMapper(); @@ -80,22 +84,28 @@ public class CQLUtils { throw new Exception("Invalid Action specified. Must of of type CQLUtils.Action"); } - cql.add( "\""+tableDefinition.getTableName()+"\"" ); - + cql.add( quote( tableDefinition.getTableName() ) ); - StringJoiner columnsString = new StringJoiner(","); - Map<String, String> columns = tableDefinition.getColumns(); - columns.forEach( (key, value) -> columnsString.add(key+" "+value)); - columnsString.add(PRIMARY_KEY +" ( "+StringUtils.join(tableDefinition.getPrimaryKeys(), ",") + " )"); - StringJoiner orderingString = new StringJoiner(" "); - Map<String, String> ordering = tableDefinition.getClusteringOrder(); - ordering.forEach( (key, value) -> orderingString.add(key+" "+value)); if ( tableAction.equals(ACTION.CREATE) ){ - cql.add("(").add(columnsString.toString()).add(")") + + cql.add(PAREN_LEFT).add( spaceSeparatedKeyValue(tableDefinition.getColumns()) ).add(COMMA) + .add(PRIMARY_KEY) + .add(PAREN_LEFT).add(PAREN_LEFT) + .add( StringUtils.join(tableDefinition.getPartitionKeys(), COMMA) ).add(PAREN_RIGHT); + + if ( tableDefinition.getColumnKeys() != null && !tableDefinition.getColumnKeys().isEmpty() ){ + + cql.add(COMMA).add( StringUtils.join(tableDefinition.getColumnKeys(), COMMA) ); + } + + cql.add(PAREN_RIGHT).add(PAREN_RIGHT) .add(WITH) - .add(CLUSTERING_ORDER_BY).add("(").add(orderingString.toString()).add(")") + .add(CLUSTERING_ORDER_BY) + .add(PAREN_LEFT) + .add( spaceSeparatedKeyValue(tableDefinition.getClusteringOrder()) ) + .add(PAREN_RIGHT) .add(AND) .add(COMPACT_STORAGE) .add(AND); @@ -118,5 +128,42 @@ public class CQLUtils { } + public static String quote( String value){ + + return "\"" + value + "\""; + + } + + public static String spaceSeparatedKeyValue(Map<String, String> columns){ + + StringJoiner columnsSchema = new StringJoiner(","); + columns.forEach( (key, value) -> columnsSchema.add(key+" "+value)); + + return columnsSchema.toString(); + + } + + + /** + * Below functions borrowed from Astyanax until the schema is re-written to be more CQL friendly + */ + + public static int getShortLength(ByteBuffer bb) { + int length = (bb.get() & 255) << 8; + return length | bb.get() & 255; + } + + public static ByteBuffer getBytes(ByteBuffer bb, int length) { + ByteBuffer copy = bb.duplicate(); + copy.limit(copy.position() + length); + bb.position(bb.position() + length); + return copy; + } + + public static ByteBuffer getWithShortLength(ByteBuffer bb) { + int length = getShortLength(bb); + return getBytes(bb, length); + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java index 801eaa7..0635b93 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/TableDefinition.java @@ -53,7 +53,8 @@ public class TableDefinition { private final String tableName; - private final Collection<String> primaryKeys; + private final Collection<String> partitionKeys; + private final Collection<String> columnKeys; private final Map<String, String> columns; private final CacheOption cacheOption; private final Map<String, Object> compaction; @@ -63,18 +64,19 @@ public class TableDefinition { private final String gcGraceSeconds; private final Map<String, String> clusteringOrder; - public TableDefinition( final String tableName, final Collection<String> primaryKeys, - final Map<String, String> columns, final CacheOption cacheOption, - final Map<String, String> clusteringOrder){ + public TableDefinition( final String tableName, final Collection<String> partitionKeys, + final Collection<String> columnKeys, final Map<String, String> columns, + final CacheOption cacheOption, final Map<String, String> clusteringOrder){ Preconditions.checkNotNull(tableName, "Table name cannot be null"); - Preconditions.checkNotNull(primaryKeys, "Primary Key(s) cannot be null"); + Preconditions.checkNotNull(partitionKeys, "Primary Key(s) cannot be null"); Preconditions.checkNotNull(columns, "Columns cannot be null"); Preconditions.checkNotNull(cacheOption, "CacheOption cannot be null"); this.tableName = tableName; - this.primaryKeys = primaryKeys; + this.partitionKeys = partitionKeys; + this.columnKeys = columnKeys; this.columns = columns; this.cacheOption = cacheOption; this.clusteringOrder = clusteringOrder; @@ -97,8 +99,12 @@ public class TableDefinition { return tableName; } - public Collection<String> getPrimaryKeys() { - return primaryKeys; + public Collection<String> getPartitionKeys() { + return partitionKeys; + } + + public Collection<String> getColumnKeys() { + return columnKeys; } public Map<String, String> getColumns() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java index 18c9327..dccbd45 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java @@ -93,6 +93,22 @@ public class ColumnNameIteratorTest { return ConsistencyLevel.CL_QUORUM; } + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() { + return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE; + } + + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() { + return com.datastax.driver.core.ConsistencyLevel.ALL; + } + + + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() { + return com.datastax.driver.core.ConsistencyLevel.QUORUM; + } + @Override public int[] getShardSettings() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java index bd1ea55..d020949 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java @@ -98,6 +98,22 @@ public class MultiKeyColumnNameIteratorTest { return ConsistencyLevel.CL_QUORUM; } + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() { + return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE; + } + + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() { + return com.datastax.driver.core.ConsistencyLevel.ALL; + } + + + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() { + return com.datastax.driver.core.ConsistencyLevel.QUORUM; + } + @Override public int[] getShardSettings() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java index 9f5741b..8bcdcb2 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java @@ -101,6 +101,22 @@ public class MultiRowColumnIteratorTest { return ConsistencyLevel.CL_QUORUM; } + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxReadCl() { + return com.datastax.driver.core.ConsistencyLevel.LOCAL_ONE; + } + + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxReadConsistentCl() { + return com.datastax.driver.core.ConsistencyLevel.ALL; + } + + + @Override + public com.datastax.driver.core.ConsistencyLevel getDataStaxWriteCl() { + return com.datastax.driver.core.ConsistencyLevel.QUORUM; + } + @Override public int[] getShardSettings() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java index 8ddfa3f..76fcefe 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/CQLUtilsTest.java @@ -43,9 +43,11 @@ public class CQLUtilsTest { columns.put("column1", "text"); columns.put("value", "blob"); - List<String> primaryKeys = new ArrayList<>(); - primaryKeys.add("key"); - primaryKeys.add("column1"); + List<String> partitionKeys = new ArrayList<>(); + partitionKeys.add("key"); + + List<String> columnKeys = new ArrayList<>(); + columnKeys.add("column1"); Map<String, String> clusteringOrder = new HashMap<>(); clusteringOrder.put("column1", "DESC"); @@ -54,7 +56,8 @@ public class CQLUtilsTest { TableDefinition table1 = new TableDefinition( "table1", - primaryKeys, + partitionKeys, + columnKeys, columns, TableDefinition.CacheOption.KEYS, clusteringOrder @@ -65,8 +68,8 @@ public class CQLUtilsTest { assertTrue( createCQL.contains( CQLUtils.CREATE_TABLE ) && !createCQL.contains( CQLUtils.ALTER_TABLE ) ); assertTrue( updateCQL.contains( CQLUtils.ALTER_TABLE ) && !updateCQL.contains( CQLUtils.CREATE_TABLE ) ); - //logger.info("CREATE: {}", createCQL); - //logger.info("UPDATE: {}", updateCQL); + logger.info("CREATE: {}", createCQL); + logger.info("UPDATE: {}", updateCQL); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java index 792864b..3acce69 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/datastax/TableDefinitionTest.java @@ -32,7 +32,7 @@ public class TableDefinitionTest { public void testNullTableName(){ try{ - TableDefinition table1 = new TableDefinition(null, null, null, null, null); + TableDefinition table1 = new TableDefinition(null, null, null, null, null, null); } catch (NullPointerException npe){ assertEquals("Table name cannot be null", npe.getMessage()); } @@ -44,7 +44,7 @@ public class TableDefinitionTest { public void testNullPrimaryKeys(){ try{ - TableDefinition table1 = new TableDefinition("table1", null, null, null, null); + TableDefinition table1 = new TableDefinition("table1", null, null, null, null, null); } catch (NullPointerException npe){ assertEquals("Primary Key(s) cannot be null", npe.getMessage()); } @@ -57,7 +57,7 @@ public class TableDefinitionTest { try{ TableDefinition table1 = new TableDefinition("table1", - new ArrayList<>(), null, null, null); + new ArrayList<>(), null, null, null, null); } catch (NullPointerException npe){ assertEquals("Columns cannot be null", npe.getMessage()); } @@ -71,7 +71,7 @@ public class TableDefinitionTest { try{ TableDefinition table1 = new TableDefinition("table1", new ArrayList<>(), - new HashMap<>(), null, null); + new ArrayList<>(), new HashMap<>(), null, null); } catch (NullPointerException npe){ assertEquals("CacheOption cannot be null", npe.getMessage()); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java index 501ade7..ae05057 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java @@ -51,7 +51,6 @@ public class MapManagerImpl implements MapManager { return mapSerialization.getString( scope, key ); } - @Override public String getStringHighConsistency( final String key ) { return mapSerialization.getStringHighConsistency(scope, key); http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java index f90f80c..5fc6ee1 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java @@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.map.impl; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -28,6 +29,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import com.datastax.driver.core.*; +import com.datastax.driver.core.querybuilder.Clause; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Using; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.UTF8Type; @@ -39,6 +44,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer; +import org.apache.usergrid.persistence.core.datastax.CQLUtils; import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator; import org.apache.usergrid.persistence.core.shard.StringHashUtils; import org.apache.usergrid.persistence.map.MapScope; @@ -47,17 +53,9 @@ import com.google.common.base.Preconditions; import com.google.common.hash.Funnel; import com.google.inject.Inject; import com.google.inject.Singleton; -import com.netflix.astyanax.ColumnListMutation; import com.netflix.astyanax.Keyspace; -import com.netflix.astyanax.MutationBatch; -import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import com.netflix.astyanax.connectionpool.exceptions.NotFoundException; -import com.netflix.astyanax.model.Column; import com.netflix.astyanax.model.CompositeBuilder; import com.netflix.astyanax.model.CompositeParser; -import com.netflix.astyanax.model.ConsistencyLevel; -import com.netflix.astyanax.model.Row; -import com.netflix.astyanax.model.Rows; import com.netflix.astyanax.serializers.BooleanSerializer; import com.netflix.astyanax.serializers.StringSerializer; @@ -65,6 +63,9 @@ import com.netflix.astyanax.serializers.StringSerializer; @Singleton public class MapSerializationImpl implements MapSerialization { + private static final String MAP_KEYS_TABLE = CQLUtils.quote("Map_Keys"); + private static final String MAP_ENTRIES_TABLE = CQLUtils.quote("Map_Entries"); + private static final MapKeySerializer KEY_SERIALIZER = new MapKeySerializer(); private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER = @@ -81,7 +82,7 @@ public class MapSerializationImpl implements MapSerialization { private static final StringSerializer STRING_SERIALIZER = StringSerializer.get(); - private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder(); + private static final StringResultsBuilderCQL STRING_RESULTS_BUILDER_CQL = new StringResultsBuilderCQL(); /** @@ -117,137 +118,111 @@ public class MapSerializationImpl implements MapSerialization { private final Keyspace keyspace; private final CassandraConfig cassandraConfig; + private final Session session; + @Inject - public MapSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig ) { + public MapSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig, + final Session session ) { this.keyspace = keyspace; + this.session = session; this.cassandraConfig = cassandraConfig; } @Override public String getString( final MapScope scope, final String key ) { - Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() ); - return ( col != null ) ? col.getStringValue() : null; + + ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadCl() ) ; + return value != null ? (String)DataType.text().deserialize(value,ProtocolVersion.NEWEST_SUPPORTED ): null; } @Override public String getStringHighConsistency( final MapScope scope, final String key ) { - Column<Boolean> col = getValue( scope, key, cassandraConfig.getConsistentReadCL() ); // TODO: why boolean? - return ( col != null ) ? col.getStringValue() : null; + + ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadConsistentCl() ) ; + return value != null ? (String)DataType.text().deserialize(value,ProtocolVersion.NEWEST_SUPPORTED ): null; } @Override public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ) { - return getValues( scope, keys, STRING_RESULTS_BUILDER ); + return getValuesCQL( scope, keys, STRING_RESULTS_BUILDER_CQL ); } @Override public void putString( final MapScope scope, final String key, final String value ) { - final RowOp op = new RowOp() { - @Override - public void putValue( final ColumnListMutation<Boolean> columnListMutation ) { - columnListMutation.putColumn( true, value ); - } - - @Override - public void putKey( final ColumnListMutation<String> keysMutation ) { - keysMutation.putColumn( key, true ); - } - }; - - - writeString( scope, key, value, op ); + writeStringCQL( scope, key, value, -1 ); } @Override public void putString( final MapScope scope, final String key, final String value, final int ttl ) { - Preconditions.checkArgument( ttl > 0, "ttl must be > than 0" ); - - final RowOp op = new RowOp() { - @Override - public void putValue( final ColumnListMutation<Boolean> columnListMutation ) { - columnListMutation.putColumn( true, value, ttl ); - } - - @Override - public void putKey( final ColumnListMutation<String> keysMutation ) { - keysMutation.putColumn( key, true, ttl ); - } - }; - - - writeString( scope, key, value, op ); + Preconditions.checkArgument( ttl > 0, "ttl must be > than 0" ); + writeStringCQL( scope, key, value, ttl ); } /** * Write our string index with the specified row op */ - private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) { + private void writeStringCQL( final MapScope scope, final String key, final String value, int ttl ) { Preconditions.checkNotNull( scope, "mapscope is required" ); Preconditions.checkNotNull( key, "key is required" ); Preconditions.checkNotNull( value, "value is required" ); - final MutationBatch batch = keyspace.prepareMutationBatch(); - - //add it to the entry - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); + Statement mapEntry; + Statement mapKey; + if (ttl > 0){ + Using timeToLive = QueryBuilder.ttl(ttl); - //serialize to the - // entry + mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE) + .using(timeToLive) + .value("key", getMapEntryPartitionKey(scope, key)) + .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); - rowOp.putValue( batch.withRow( MAP_ENTRIES, entryRowKey ) ); + final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); + mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE) + .using(timeToLive) + .value("key", getMapKeyPartitionKey(scope, key, bucket)) + .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); + }else{ + mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE) + .value("key", getMapEntryPartitionKey(scope, key)) + .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); - //add it to the keys + // get a bucket number for the map keys table + final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); - final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); - - final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket ); - - //serialize to the entry + mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE) + .value("key", getMapKeyPartitionKey(scope, key, bucket)) + .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); - rowOp.putKey( batch.withRow( MAP_KEYS, keyRowKey ) ); + } + session.execute(mapEntry); + session.execute(mapKey); - executeBatch( batch ); } - /** - * Callbacks for performing row operations - */ - private static interface RowOp { - - /** - * Callback to do the row - * - * @param columnListMutation The column mutation - */ - void putValue( final ColumnListMutation<Boolean> columnListMutation ); - - - /** - * Write the key - */ - void putKey( final ColumnListMutation<String> keysMutation ); - } - @Override public UUID getUuid( final MapScope scope, final String key ) { - Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() ); - return ( col != null ) ? col.getUUIDValue() : null; + ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadCl() ); + return value != null ? (UUID)DataType.uuid().deserialize(value, ProtocolVersion.NEWEST_SUPPORTED ) : null; } @@ -258,31 +233,34 @@ public class MapSerializationImpl implements MapSerialization { Preconditions.checkNotNull( key, "key is required" ); Preconditions.checkNotNull( putUuid, "value is required" ); - final MutationBatch batch = keyspace.prepareMutationBatch(); - //add it to the entry - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); + Statement mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE) + .value("key", getMapEntryPartitionKey(scope, key)) + .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", DataType.uuid().serialize(putUuid, ProtocolVersion.NEWEST_SUPPORTED)); - //serialize to the entry - batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, putUuid ); + session.execute(mapEntry); - //add it to the keys final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); + Statement mapKey; + mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE) + .value("key", getMapKeyPartitionKey(scope, key, bucket)) + .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED)); + + session.execute(mapKey); + } - final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket ); - //serialize to the entry - batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true ); - executeBatch( batch ); - } @Override public Long getLong( final MapScope scope, final String key ) { - Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() ); - return ( col != null ) ? col.getLongValue() : null; + + ByteBuffer value = getValueCQL( scope, key, cassandraConfig.getDataStaxReadCl()); + return value != null ? (Long)DataType.bigint().deserialize(value, ProtocolVersion.NEWEST_SUPPORTED ) : null; } @@ -293,46 +271,50 @@ public class MapSerializationImpl implements MapSerialization { Preconditions.checkNotNull( key, "key is required" ); Preconditions.checkNotNull( value, "value is required" ); - final MutationBatch batch = keyspace.prepareMutationBatch(); + Statement mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE) + .value("key", getMapEntryPartitionKey(scope, key)) + .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", DataType.bigint().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)); - //add it to the entry - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); + session.execute(mapEntry); - //serialize to the entry - batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, value ); - //add it to the keys final int bucket = BUCKET_LOCATOR.getCurrentBucket( key ); + Statement mapKey; + mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE) + .value("key", getMapKeyPartitionKey(scope, key, bucket)) + .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED)) + .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED)); - final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket ); - - //serialize to the entry - batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true ); - - executeBatch( batch ); + session.execute(mapKey); } @Override public void delete( final MapScope scope, final String key ) { - final MutationBatch batch = keyspace.prepareMutationBatch(); - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); - //serialize to the entry - batch.withRow( MAP_ENTRIES, entryRowKey ).delete(); - - //add it to the keys, we're not sure which one it may have come from - final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key ); + Statement deleteMapEntry; + Clause equalsEntryKey = QueryBuilder.eq("key", getMapEntryPartitionKey(scope, key)); + deleteMapEntry = QueryBuilder.delete().from(MAP_ENTRIES_TABLE) + .where(equalsEntryKey); + session.execute(deleteMapEntry); - final List<BucketScopedRowKey<String>> rowKeys = - BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets ); - for ( BucketScopedRowKey<String> rowKey : rowKeys ) { - batch.withRow( MAP_KEYS, rowKey ).deleteColumn( key ); + // not sure which bucket the value is in, execute a delete against them all + final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key ); + List<ByteBuffer> mapKeys = new ArrayList<>(); + for( int bucket : buckets){ + mapKeys.add( getMapKeyPartitionKey(scope, key, bucket)); } - executeBatch( batch ); + Statement deleteMapKey; + Clause inKey = QueryBuilder.in("key", mapKeys); + deleteMapKey = QueryBuilder.delete().from(MAP_KEYS_TABLE) + .where(inKey); + session.execute(deleteMapKey); + + } @@ -353,72 +335,38 @@ public class MapSerializationImpl implements MapSerialization { } - private Column<Boolean> getValue( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) { + private ByteBuffer getValueCQL( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) { - //add it to the entry - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); + Clause in = QueryBuilder.in("key", getMapEntryPartitionKey(scope, key) ); + Statement statement = QueryBuilder.select().all().from(MAP_ENTRIES_TABLE) + .where(in) + .setConsistencyLevel(consistencyLevel); - //now get all columns, including the "old row key value" - try { - final Column<Boolean> result = - keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( consistencyLevel ).getKey( entryRowKey ).getColumn( true ).execute().getResult(); + ResultSet resultSet = session.execute(statement); + com.datastax.driver.core.Row row = resultSet.one(); - return result; - } - catch ( NotFoundException nfe ) { - //nothing to return - return null; - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to connect to cassandra", e ); - } + return row != null ? row.getBytes("value") : null; } - /** - * Get multiple values, using the string builder - */ - private <T> T getValues( final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder ) { - - final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() ); + private <T> T getValuesCQL( final MapScope scope, final Collection<String> keys, final ResultsBuilderCQL<T> builder ) { - for ( final String key : keys ) { - //add it to the entry - final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key ); + final List<ByteBuffer> serializedKeys = new ArrayList<>(); - rowKeys.add( entryRowKey ); - } + keys.forEach(key -> serializedKeys.add(getMapEntryPartitionKey(scope,key))); + Clause in = QueryBuilder.in("key", serializedKeys ); + Statement statement = QueryBuilder.select().all().from(MAP_ENTRIES_TABLE) + .where(in); - //now get all columns, including the "old row key value" - try { - final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows = - keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKeySlice( - rowKeys ).withColumnSlice( true ).execute() - .getResult(); + ResultSet resultSet = session.execute(statement); - return builder.buildResults( rows ); - } - catch ( NotFoundException nfe ) { - //nothing to return - return null; - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to connect to cassandra", e ); - } + return builder.buildResultsCQL( resultSet ); } - private void executeBatch( MutationBatch batch ) { - try { - batch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to connect to cassandra", e ); - } - } /** @@ -491,37 +439,114 @@ public class MapSerializationImpl implements MapSerialization { } + /** * Build the results from the row keys */ - private static interface ResultsBuilder<T> { - public T buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ); + private interface ResultsBuilderCQL<T> { + + T buildResultsCQL( final ResultSet resultSet ); } - public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>> { + public static class StringResultsBuilderCQL implements ResultsBuilderCQL<Map<String, String>> { @Override - public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) { - final int size = rows.size(); + public Map<String, String> buildResultsCQL( final ResultSet resultSet ) { - final Map<String, String> results = new HashMap<>( size ); - for ( int i = 0; i < size; i++ ) { + final Map<String, String> results = new HashMap<>(); - final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i ); + resultSet.all().forEach( row -> { - final String value = row.getColumns().getStringValue( true, null ); + @SuppressWarnings("unchecked") + List<Object> keys = (List) deserializeMapEntryKey(row.getBytes("key")); + String value = (String)DataType.text().deserialize( row.getBytes("value"), + ProtocolVersion.NEWEST_SUPPORTED ); - if ( value == null ) { - continue; - } + // the actual string key value is the last element + results.put((String)keys.get(keys.size() -1), value); - results.put( row.getKey().getKey().key, value ); - } + }); return results; } } + + private static Object deserializeMapEntryKey(ByteBuffer bb){ + + List<Object> stuff = new ArrayList<>(); + while(bb.hasRemaining()){ + ByteBuffer data = CQLUtils.getWithShortLength(bb); + if(stuff.size() == 0){ + stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + } + + return stuff; + + } + + public static ByteBuffer serializeKeys(UUID ownerUUID, String ownerType, String mapName, String mapKey, + int bucketNumber ){ + + List<Object> keys = new ArrayList<>(4); + keys.add(0, ownerUUID); + keys.add(1, ownerType); + keys.add(2, mapName); + keys.add(3, mapKey); + + if( bucketNumber > 0){ + keys.add(4, bucketNumber); + } + + // UUIDs are 16 bytes, allocate the buffer accordingly + int size = 16+ownerType.length()+mapName.length()+mapKey.length(); + if(bucketNumber > 0 ){ + // ints are 4 bytes + size += 4; + } + + // we always need to add length for the 2 byte short and 1 byte equality + size += keys.size()*3; + + ByteBuffer stuff = ByteBuffer.allocate(size); + + for (Object key : keys) { + + ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED); + if (kb == null) { + kb = ByteBuffer.allocate(0); + } + + stuff.putShort((short) kb.remaining()); + stuff.put(kb.slice()); + stuff.put((byte) 0); + + + } + stuff.flip(); + return stuff.duplicate(); + + } + + + private ByteBuffer getMapEntryPartitionKey(MapScope scope, String key){ + + return serializeKeys(scope.getApplication().getUuid(), + scope.getApplication().getType(), scope.getName(), key, -1); + + } + + private ByteBuffer getMapKeyPartitionKey(MapScope scope, String key, int bucketNumber){ + + return serializeKeys(scope.getApplication().getUuid(), + scope.getApplication().getType(), scope.getName(), key, bucketNumber); + + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2203f433/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java index 41286ab..2a68247 100644 --- a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java +++ b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java @@ -82,6 +82,22 @@ public class MapManagerTest { assertEquals( value, returned ); } + @Test + public void writeReadStringWithLongKey() { + MapManager mm = mmf.createMapManager( this.scope ); + + final String key = "key1234567890123456789012345678901234567890123456789012345678901234567890" + + "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890" + + "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"; + final String value = "value"; + + mm.putString( key, value ); + + final String returned = mm.getString( key ); + + assertEquals( value, returned ); + } + @Test public void multiReadNoKey() {
