Repository: gora Updated Branches: refs/heads/master dc76da920 -> 3cacf2a25
forward port of Make Cassandra keyspace consistency configurable within gora.properties Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/68302e21 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/68302e21 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/68302e21 Branch: refs/heads/master Commit: 68302e213747700ca38cef546f05d0f15a3e99b4 Parents: dc76da9 Author: Lewis John McGibbney <[email protected]> Authored: Fri Aug 15 09:49:14 2014 -0700 Committer: Lewis John McGibbney <[email protected]> Committed: Fri Aug 15 09:49:14 2014 -0700 ---------------------------------------------------------------------- .../gora/cassandra/store/CassandraClient.java | 131 ++++++++++++++----- .../gora/cassandra/store/CassandraStore.java | 25 +++- gora-cassandra/src/test/conf/gora.properties | 15 +-- .../gora/dynamodb/store/DynamoDBStore.java | 8 +- 4 files changed, 133 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java index 1d56e32..9e8cd7b 100644 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java @@ -18,11 +18,16 @@ package org.apache.gora.cassandra.store; +import static org.apache.gora.cassandra.store.CassandraStore.colFamConsLvl; +import static org.apache.gora.cassandra.store.CassandraStore.readOpConsLvl; +import static org.apache.gora.cassandra.store.CassandraStore.writeOpConsLvl; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel; import me.prettyprint.cassandra.serializers.ByteBufferSerializer; @@ -57,7 +62,20 @@ import org.apache.gora.query.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * CassandraClient is where all of the primary datastore functionality is + * executed. Typically CassandraClient is invoked by calling + * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}. + * CassandraClient deals with Cassandra data model definition, mutation, + * and general/specific mappings. + * @see {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)} + * + * @param <K> + * @param <T> + */ public class CassandraClient<K, T extends PersistentBase> { + + /** The logging implementation */ public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class); private Cluster cluster; @@ -66,10 +84,29 @@ public class CassandraClient<K, T extends PersistentBase> { private Class<K> keyClass; private Class<T> persistentClass; + /** Object which holds the XML mapping for Cassandra. */ private CassandraMapping cassandraMapping = null; + /** Hector client default column family consistency level. */ + public static final String DEFAULT_HECTOR_CONSIS_LEVEL = "QUORUM"; + + /** Cassandra serializer to be used for serializing Gora's keys. */ private Serializer<K> keySerializer; + /** + * Given our key, persistentClass from + * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)} + * we make best efforts to dictate our data model. + * We make a quick check within {@link org.apache.gora.cassandra.store.CassandraClient#checkKeyspace(String) + * to see if our keyspace has already been invented, this simple check prevents us from + * recreating the keyspace if it already exists. + * We then simple specify (based on the input keyclass) an appropriate serializer + * via {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer} before + * defining a mutator from and by which we can mutate this object. + * @param keyClass the Key by which we wish o assign a record object + * @param persistentClass the generated {@link org.apache.org.gora.persistency.Peristent} bean representing the data. + * @throws Exception + */ public void initialize(Class<K> keyClass, Class<T> persistentClass) throws Exception { this.keyClass = keyClass; @@ -77,12 +114,14 @@ public class CassandraClient<K, T extends PersistentBase> { this.persistentClass = persistentClass; this.cassandraMapping = CassandraMappingManager.getManager().get(persistentClass); - this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new CassandraHostConfigurator(this.cassandraMapping.getHostName())); + this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), + new CassandraHostConfigurator(this.cassandraMapping.getHostName())); // add keyspace to cluster checkKeyspace(); - // Just create a Keyspace object on the client side, corresponding to an already existing keyspace with already created column families. + // Just create a Keyspace object on the client side, corresponding to an already + // existing keyspace with already created column families. this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster); this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass); @@ -99,10 +138,17 @@ public class CassandraClient<K, T extends PersistentBase> { /** * Check if keyspace already exists. If not, create it. - * In this method, we also utilise Hector's {@ConfigurableConsistencyLevel} - * logic. It is set by passing a ConfigurableConsistencyLevel object right - * when the Keyspace is created. Currently consistency level is .ONE which - * permits consistency to wait until one replica has responded. + * In this method, we also utilize Hector's + * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} logic. + * It is set by passing a + * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} object right + * when the {@link me.prettyprint.hector.api.Keyspace} is created. + * If we cannot find a consistency level within <code>gora.properites</code>, + * then column family consistency level is set to QUORUM (by default) which permits + * consistency to wait for a quorum of replicas to respond regardless of data center. + * QUORUM is Hector Client's default setting and we respect that here as well. + * + * @see http://hector-client.github.io/hector/build/html/content/consistency_level.html */ public void checkKeyspace() { // "describe keyspace <keyspaceName>;" query @@ -116,29 +162,29 @@ public class CassandraClient<K, T extends PersistentBase> { } keyspaceDefinition = HFactory.createKeyspaceDefinition( - this.cassandraMapping.getKeyspaceName(), + this.cassandraMapping.getKeyspaceName(), this.cassandraMapping.getKeyspaceReplicationStrategy(), this.cassandraMapping.getKeyspaceReplicationFactor(), columnFamilyDefinitions ); this.cluster.addKeyspace(keyspaceDefinition, true); - // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'"); - // Create a customized Consistency Level - ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel(); - Map<String, HConsistencyLevel> clmap = new HashMap<String, HConsistencyLevel>(); - - // Define CL.ONE for ColumnFamily "ColumnFamily" - clmap.put("ColumnFamily", HConsistencyLevel.ONE); - - // In this we use CL.ONE for read and writes. But you can use different CLs if needed. - configurableConsistencyLevel.setReadCfConsistencyLevels(clmap); - configurableConsistencyLevel.setWriteCfConsistencyLevels(clmap); - - // Then let the keyspace know - HFactory.createKeyspace("Keyspace", this.cluster, configurableConsistencyLevel); - + // GORA-167 Create a customized Consistency Level + ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel(); + Map<String, HConsistencyLevel> clmap = getConsisLevelForColFams(columnFamilyDefinitions); + // Column family consistency levels + ccl.setReadCfConsistencyLevels(clmap); + ccl.setWriteCfConsistencyLevels(clmap); + // Operations consistency levels + String opConsisLvl = (readOpConsLvl!=null || !readOpConsLvl.isEmpty())?readOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL; + ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl)); + LOG.debug("Hector read consistency configured to '" + opConsisLvl + "'."); + opConsisLvl = (writeOpConsLvl!=null || !writeOpConsLvl.isEmpty())?writeOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL; + ccl.setDefaultWriteConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl)); + LOG.debug("Hector write consistency configured to '" + opConsisLvl + "'."); + + HFactory.createKeyspace("Keyspace", this.cluster, ccl); keyspaceDefinition = null; } else { @@ -164,6 +210,22 @@ public class CassandraClient<K, T extends PersistentBase> { } /** + * Method in charge of setting the consistency level for defined column families. + * @param pColFams Column families + * @return Map<String, HConsistencyLevel> with the mapping between colFams and consistency level. + */ + private Map<String, HConsistencyLevel> getConsisLevelForColFams(List<ColumnFamilyDefinition> pColFams) { + Map<String, HConsistencyLevel> clMap = new HashMap<String, HConsistencyLevel>(); + // Get columnFamily consistency level. + String colFamConsisLvl = (colFamConsLvl != null && !colFamConsLvl.isEmpty())?colFamConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL; + LOG.debug("ColumnFamily consistency level configured to '" + colFamConsisLvl + "'."); + // Define consistency for ColumnFamily "ColumnFamily" + for (ColumnFamilyDefinition colFamDef : pColFams) + clMap.put(colFamDef.getName(), HConsistencyLevel.valueOf(colFamConsisLvl)); + return clMap; + } + + /** * Drop keyspace. */ public void dropKeyspace() { @@ -259,10 +321,10 @@ public class CassandraClient<K, T extends PersistentBase> { /** * Adds an subColumn inside the cassandraMapping file when a String is serialized - * @param key - * @param fieldName - * @param columnName - * @param value + * @param key the row key + * @param fieldName the field name + * @param columnName the column name (the member name, or the index of array) + * @param value the member value */ public void addSubColumn(K key, String fieldName, String columnName, Object value) { addSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName), value); @@ -270,10 +332,10 @@ public class CassandraClient<K, T extends PersistentBase> { /** * Adds an subColumn inside the cassandraMapping file when an Integer is serialized - * @param key - * @param fieldName - * @param columnName - * @param value + * @param key the row key + * @param fieldName the field name + * @param columnName the column name (the member name, or the index of array) + * @param value the member value */ public void addSubColumn(K key, String fieldName, Integer columnName, Object value) { addSubColumn(key, fieldName, IntegerSerializer.get().toByteBuffer(columnName), value); @@ -323,6 +385,7 @@ public class CassandraClient<K, T extends PersistentBase> { //TODO Verify this. Everything that goes inside a genericArray will go inside a column so let's just delete that. deleteColumn(key, cassandraMapping.getFamily(fieldName), toByteBuffer(fieldName)); } + public void addGenericArray(K key, String fieldName, GenericArray<?> array) { if (isSuper( cassandraMapping.getFamily(fieldName) )) { int i= 0; @@ -426,7 +489,8 @@ public class CassandraClient<K, T extends PersistentBase> { K startKey = query.getStartKey(); K endKey = query.getEndKey(); - RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get()); + RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery + (this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get()); rangeSlicesQuery.setColumnFamily(family); rangeSlicesQuery.setKeys(startKey, endKey); rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE); @@ -536,7 +600,8 @@ public class CassandraClient<K, T extends PersistentBase> { K startKey = query.getStartKey(); K endKey = query.getEndKey(); - RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get()); + RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery + (this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get()); rangeSuperSlicesQuery.setColumnFamily(family); rangeSuperSlicesQuery.setKeys(startKey, endKey); rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE); @@ -556,6 +621,6 @@ public class CassandraClient<K, T extends PersistentBase> { * @return Keyspace */ public String getKeyspaceName() { - return this.cassandraMapping.getKeyspaceName(); + return this.cassandraMapping.getKeyspaceName(); } } http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java index ffb4af0..496f1f0 100644 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java @@ -60,6 +60,7 @@ import org.apache.gora.query.PartitionQuery; import org.apache.gora.query.Query; import org.apache.gora.query.Result; import org.apache.gora.query.impl.PartitionQueryImpl; +import org.apache.gora.store.DataStoreFactory; import org.apache.gora.store.impl.DataStoreBase; import org.apache.gora.cassandra.serializers.AvroSerializerUtil; import org.slf4j.Logger; @@ -76,7 +77,21 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K /** Logging implementation */ public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class); - private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>(); + /** Consistency property level for Cassandra column families */ + private static final String COL_FAM_CL = "cf.consistency.level"; + + /** Consistency property level for Cassandra read operations. */ + private static final String READ_OP_CL = "read.consistency.level"; + + /** Consistency property level for Cassandra write operations. */ + private static final String WRITE_OP_CL = "write.consistency.level"; + + /** Variables to hold different consistency levels defined by the properties. */ + public static String colFamConsLvl; + public static String readOpConsLvl; + public static String writeOpConsLvl; + + private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>(); /** * Fixed string with value "UnionIndex" used to generate an extra column based on @@ -126,6 +141,14 @@ public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) { try { super.initialize(keyClass, persistent, properties); + if (autoCreateSchema) { + // If this is not set, then each Cassandra client should set its default + // column family + colFamConsLvl = DataStoreFactory.findProperty(properties, this, COL_FAM_CL, null); + // operations + readOpConsLvl = DataStoreFactory.findProperty(properties, this, READ_OP_CL, null); + writeOpConsLvl = DataStoreFactory.findProperty(properties, this, WRITE_OP_CL, null); + } this.cassandraClient.initialize(keyClass, persistent); } catch (Exception e) { LOG.error(e.getMessage()); http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-cassandra/src/test/conf/gora.properties ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/test/conf/gora.properties b/gora-cassandra/src/test/conf/gora.properties index 80427b4..6c8b06a 100644 --- a/gora-cassandra/src/test/conf/gora.properties +++ b/gora-cassandra/src/test/conf/gora.properties @@ -14,14 +14,13 @@ # limitations under the License. gora.datastore.default=org.apache.gora.cassandra.CassandraStore -gora.cassandrastore.keyspace= -gora.cassandrastore.name= -gora.cassandrastore.class= -gora.cassandrastore.qualifier= -gora.cassandrastore.family= -gora.cassandrastore.type= -gora.cassandraStore.cluster=Test Cluster -gora.cassandraStore.host=localhost +gora.cassandrastore.cluster=Test Cluster +gora.cassandrastore.host=localhost +# property is annotated in CassandraClient#checkKeyspace() +# options are ANY, ONE, TWO, THREE, LOCAL_QUORUM, EACH_QUORUM, QUORUM and ALL. +gora.cassandrastore.cf.consistency.level=ONE +gora.cassandrastore.read.consistency.level=QUORUM +gora.cassandrastore.write.consistency.level=ONE http://git-wip-us.apache.org/repos/asf/gora/blob/68302e21/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java ---------------------------------------------------------------------- diff --git a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java index 7192c8a..ee48542 100644 --- a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java +++ b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBStore.java @@ -161,11 +161,11 @@ public class DynamoDBStore<K, T extends Persistent> extends WSDataStoreBase<K, T LOG.debug("Initializing DynamoDB store"); getCredentials(); setWsProvider(wsProvider); - preferredSchema = properties.getProperty(PREF_SCH_NAME); - dynamoDBClient = getClient(properties.getProperty(CLI_TYP_PROP),(AWSCredentials)getConf()); - dynamoDBClient.setEndpoint(properties.getProperty(ENDPOINT_PROP)); + preferredSchema = DataStoreFactory.findProperty(properties, this, PREF_SCH_NAME, null); + dynamoDBClient = getClient(DataStoreFactory.findProperty(properties, this, CLI_TYP_PROP, null),(AWSCredentials)getConf()); + dynamoDBClient.setEndpoint(DataStoreFactory.findProperty(properties, this, ENDPOINT_PROP, null)); mapping = readMapping(); - consistency = properties.getProperty(CONSISTENCY_READS); + consistency = DataStoreFactory.findProperty(properties, this, CONSISTENCY_READS, null); persistentClass = pPersistentClass; } catch (Exception e) {
