Add Query level consistency
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/1fba4162 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/1fba4162 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/1fba4162 Branch: refs/heads/master Commit: 1fba41622f64af497b9da81fe6aa7de069da32c4 Parents: b1f906c Author: madhawa-gunasekara <[email protected]> Authored: Tue Aug 15 21:57:30 2017 +0530 Committer: madhawa-gunasekara <[email protected]> Committed: Tue Aug 15 21:57:30 2017 +0530 ---------------------------------------------------------------------- .../serializers/AvroCassandraUtils.java | 42 ++++++++++++++----- .../cassandra/serializers/AvroSerializer.java | 44 ++++++++++++++++---- .../serializers/CassandraSerializer.java | 17 +++++++- .../cassandra/serializers/NativeSerializer.java | 31 ++++++++++++-- .../gora/cassandra/store/CassandraClient.java | 23 ++++++++++ .../store/CassandraStoreParameters.java | 10 +++++ .../store/TestAvroSerializationWithUDT.java | 2 + .../cassandra/store/TestCassandraStore.java | 2 + 8 files changed, 147 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/1fba4162/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java index 9d3dc89..9a070e4 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java @@ -192,16 +192,6 @@ class AvroCassandraUtils { return unionSchemaPos; else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.FLOAT)) return unionSchemaPos; - else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.BOOLEAN)) - return unionSchemaPos; - else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.MAP)) - return unionSchemaPos; - else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.ARRAY)) - return unionSchemaPos; - else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.ENUM)) - return unionSchemaPos; - else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.FIXED)) - return unionSchemaPos; else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.RECORD)) return unionSchemaPos; unionSchemaPos++; @@ -283,13 +273,43 @@ class AvroCassandraUtils { case STRING: if (value instanceof org.apache.avro.util.Utf8) { result = value; + } else if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = new Utf8(((ByteBuffer) value).array()); } else { result = new Utf8((String) value); } break; case INT: - result = value; + if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = ((ByteBuffer) value).getInt(); + } else { + result = value; + } + break; + + case FLOAT: + if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = ((ByteBuffer) value).getFloat(); + } else { + result = value; + } + break; + + case DOUBLE: + if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = ((ByteBuffer) value).getDouble(); + } else { + result = value; + } + break; + + case LONG: + if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = ((ByteBuffer) value).getLong(); + } else { + result = value; + } break; default: http://git-wip-us.apache.org/repos/asf/gora/blob/1fba4162/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java index 3037d6c..e064c6e 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java @@ -19,9 +19,11 @@ package org.apache.gora.cassandra.serializers; import com.datastax.driver.core.AbstractGettableData; import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.DataType; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.UDTValue; import com.datastax.driver.core.UserType; import org.apache.avro.Schema; @@ -114,11 +116,16 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { List<Object> objectArrayList = new ArrayList<>(); String cqlQuery = CassandraQueryFactory.getUpdateByQueryForAvro(mapping, query, objectArrayList, persistentSchema); ResultSet results; + SimpleStatement statement; if (objectArrayList.size() == 0) { - results = client.getSession().execute(cqlQuery); + statement = new SimpleStatement(cqlQuery); } else { - results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); } + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + results = client.getSession().execute(statement); return results.wasApplied(); } @@ -138,7 +145,11 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { ArrayList<Object> cassandraValues = new ArrayList<>(); AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields, cassandraKeys); - ResultSet resultSet = this.client.getSession().execute(cqlQuery, cassandraValues.toArray()); + SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); + } + ResultSet resultSet = this.client.getSession().execute(statement); Iterator<Row> iterator = resultSet.iterator(); ColumnDefinitions definitions = resultSet.getColumnDefinitions(); T obj = null; @@ -207,7 +218,11 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { } } String cqlQuery = CassandraQueryFactory.getInsertDataQuery(mapping, fields); - client.getSession().execute(cqlQuery, values.toArray()); + SimpleStatement statement = new SimpleStatement(cqlQuery, values.toArray()); + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + client.getSession().execute(statement); } else { LOG.info("Ignored putting persistent bean {} in the store as it is neither " + "new, neither dirty.", new Object[]{persistent}); @@ -229,7 +244,11 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { ArrayList<Object> cassandraValues = new ArrayList<>(); AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); String cqlQuery = CassandraQueryFactory.getSelectObjectQuery(mapping, cassandraKeys); - ResultSet resultSet = this.client.getSession().execute(cqlQuery, cassandraValues.toArray()); + SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); + } + ResultSet resultSet = client.getSession().execute(statement); Iterator<Row> iterator = resultSet.iterator(); ColumnDefinitions definitions = resultSet.getColumnDefinitions(); T obj = null; @@ -367,7 +386,11 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { ArrayList<Object> cassandraValues = new ArrayList<>(); AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); String cqlQuery = CassandraQueryFactory.getDeleteDataQuery(mapping, cassandraKeys); - ResultSet resultSet = this.client.getSession().execute(cqlQuery, cassandraValues.toArray()); + SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + ResultSet resultSet = client.getSession().execute(statement); return resultSet.wasApplied(); } @@ -390,11 +413,16 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields); ResultSet results; + SimpleStatement statement; if (objectArrayList.size() == 0) { - results = client.getSession().execute(cqlQuery); + statement = new SimpleStatement(cqlQuery); } else { - results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); + } + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); } + results = client.getSession().execute(statement); Iterator<Row> iterator = results.iterator(); ColumnDefinitions definitions = results.getColumnDefinitions(); T obj; http://git-wip-us.apache.org/repos/asf/gora/blob/1fba4162/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java index ab072d5..5f15743 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java @@ -17,8 +17,10 @@ package org.apache.gora.cassandra.serializers; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.TableMetadata; import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.store.CassandraClient; @@ -51,6 +53,10 @@ public abstract class CassandraSerializer<K, T extends Persistent> { protected CassandraClient client; + protected String readConsistencyLevel; + + protected String writeConsistencyLevel; + protected Map<String, String> userDefineTypeMaps; CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) { @@ -58,6 +64,8 @@ public abstract class CassandraSerializer<K, T extends Persistent> { this.persistentClass = persistantClass; this.client = cc; this.mapping = mapping; + this.readConsistencyLevel = client.getReadConsistencyLevel(); + this.writeConsistencyLevel = client.getWriteConsistencyLevel(); } /** @@ -199,11 +207,16 @@ public abstract class CassandraSerializer<K, T extends Persistent> { } else { String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList); ResultSet results; + SimpleStatement statement; if (objectArrayList.size() == 0) { - results = client.getSession().execute(cqlQuery); + statement = new SimpleStatement(cqlQuery); } else { - results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); + } + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); } + results = client.getSession().execute(statement); LOG.debug("Delete by Query was applied : " + results.wasApplied()); } LOG.info("Delete By Query method doesn't return the deleted element count."); http://git-wip-us.apache.org/repos/asf/gora/blob/1fba4162/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java index adb5c34..74dbf6a 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java @@ -17,7 +17,9 @@ package org.apache.gora.cassandra.serializers; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; import com.datastax.driver.mapping.Result; @@ -57,10 +59,18 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { this.createSchema(); MappingManager mappingManager = new MappingManager(cassandraClient.getSession()); mapper = mappingManager.mapper(persistentClass); + if (cassandraClient.getWriteConsistencyLevel() != null) { + mapper.setDefaultDeleteOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getWriteConsistencyLevel()))); + mapper.setDefaultSaveOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getWriteConsistencyLevel()))); + } + if (cassandraClient.getReadConsistencyLevel() != null) { + mapper.setDefaultGetOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getReadConsistencyLevel()))); + } } /** * {@inheritDoc} + * * @param key * @param value */ @@ -72,6 +82,7 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { /** * {@inheritDoc} + * * @param key * @return */ @@ -88,6 +99,7 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { /** * {@inheritDoc} + * * @param key * @return */ @@ -100,6 +112,7 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { /** * {@inheritDoc} + * * @param key * @param fields * @return @@ -110,7 +123,11 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { fields = getFields(); } String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields); - ResultSet results = client.getSession().execute(cqlQuery, key); + SimpleStatement statement = new SimpleStatement(cqlQuery, key); + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); + } + ResultSet results = client.getSession().execute(statement); Result<T> objects = mapper.map(results); List<T> objectList = objects.all(); if (objectList != null) { @@ -123,6 +140,7 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { /** * {@inheritDoc} + * * @throws Exception */ @Override @@ -140,6 +158,7 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { /** * {@inheritDoc} + * * @param query * @return */ @@ -148,16 +167,22 @@ class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { List<Object> objectArrayList = new ArrayList<>(); String cqlQuery = CassandraQueryFactory.getUpdateByQueryForNative(mapping, query, objectArrayList); ResultSet results; + SimpleStatement statement; if (objectArrayList.size() == 0) { - results = client.getSession().execute(cqlQuery); + statement = new SimpleStatement(cqlQuery); } else { - results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); + } + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); } + results = client.getSession().execute(statement); return results.wasApplied(); } /** * {@inheritDoc} + * * @param dataStore * @param query * @return http://git-wip-us.apache.org/repos/asf/gora/blob/1fba4162/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java index 9a49a38..4e1d8bd 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java @@ -74,6 +74,10 @@ public class CassandraClient { private CassandraMapping mapping; + private String readConsistencyLevel; + + private String writeConsistencyLevel; + public Session getSession() { return session; } @@ -91,6 +95,8 @@ public class CassandraClient { if (codecs != null) { registerCustomCodecs(codecs); } + readConsistencyLevel = properties.getProperty(CassandraStoreParameters.READ_CONSISTENCY_LEVEL); + writeConsistencyLevel = properties.getProperty(CassandraStoreParameters.WRITE_CONSISTENCY_LEVEL); registerOptionalCodecs(); this.session = this.cluster.connect(); } @@ -499,6 +505,23 @@ public class CassandraClient { return null; } + /** + * This method returns configured read consistency level. + * @return read Consistency level + */ + public String getReadConsistencyLevel() { + return readConsistencyLevel; + } + + /** + * This method returns configured write consistency level. + * @return write Consistency level + */ + public String getWriteConsistencyLevel() { + return writeConsistencyLevel; + } + + public void close() { this.session.close(); this.cluster.close(); http://git-wip-us.apache.org/repos/asf/gora/blob/1fba4162/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java index bb758f6..05c63ab 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java @@ -224,4 +224,14 @@ public class CassandraStoreParameters { * string */ public static final String CUSTOM_CODEC_FILE = "gora.cassandrastore.custom.codec.file"; + /** + * Property pointing to set consistency level for read queries + * "ALL", "ANY", "EACH_QUORUM", "LOCAL_ONE", "LOCAL_QUORUM", "LOCAL_SERIAL", "ONE", "QUORUM", "SERIAL", "THREE", "TWO" + */ + public static final String READ_CONSISTENCY_LEVEL = "gora.cassandrastore.read.consistencyLevel"; + /** + * Property pointing to set consistency level for write queries + * "ALL", "ANY", "EACH_QUORUM", "LOCAL_ONE", "LOCAL_QUORUM", "LOCAL_SERIAL", "ONE", "QUORUM", "SERIAL", "THREE", "TWO" + */ + public static final String WRITE_CONSISTENCY_LEVEL = "gora.cassandrastore.write.consistencyLevel"; } http://git-wip-us.apache.org/repos/asf/gora/blob/1fba4162/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java index 71b2b1d..c016893 100644 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestAvroSerializationWithUDT.java @@ -58,6 +58,8 @@ public class TestAvroSerializationWithUDT { parameter.setProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE, "avro"); parameter.setProperty(CassandraStoreParameters.PROTOCOL_VERSION, "3"); parameter.setProperty(CassandraStoreParameters.CLUSTER_NAME, "Test Cluster"); + parameter.setProperty(CassandraStoreParameters.READ_CONSISTENCY_LEVEL,"ONE"); + parameter.setProperty(CassandraStoreParameters.WRITE_CONSISTENCY_LEVEL,"ONE"); parameter.setProperty("gora.cassandrastore.mapping.file", "avroUDT/gora-cassandra-mapping.xml"); } http://git-wip-us.apache.org/repos/asf/gora/blob/1fba4162/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java index caf375e..ce9e2df 100644 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java @@ -68,6 +68,8 @@ public class TestCassandraStore extends DataStoreTestBase { properties.setProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE, "avro"); properties.setProperty(CassandraStoreParameters.PROTOCOL_VERSION, "3"); properties.setProperty(CassandraStoreParameters.CLUSTER_NAME, "Test Cluster"); + properties.setProperty(CassandraStoreParameters.READ_CONSISTENCY_LEVEL,"ONE"); + properties.setProperty(CassandraStoreParameters.WRITE_CONSISTENCY_LEVEL,"ONE"); properties.setProperty("gora.cassandrastore.mapping.file", "avro/gora-cassandra-mapping.xml"); }
