http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java new file mode 100644 index 0000000..70e0ecf --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java @@ -0,0 +1,274 @@ +package org.apache.gora.cassandra.serializers; + +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.gora.cassandra.bean.CassandraKey; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; +import org.apache.gora.persistency.impl.PersistentBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class is Utils class for Avro serialization. + */ +public class AvroCassandraUtils { + + /** + * Default schema index with value "0" used when AVRO Union data types are stored. + */ + private static final int DEFAULT_UNION_SCHEMA = 0; + + private static final Logger LOG = LoggerFactory.getLogger(AvroCassandraUtils.class); + + static void processKeys(CassandraMapping cassandraMapping, Object key, List<String> keys, List<Object> values) { + CassandraKey cassandraKey = cassandraMapping.getCassandraKey(); + if (cassandraMapping.isPartitionKeyDefined()) { + if (cassandraKey != null) { + if (key instanceof PersistentBase) { + PersistentBase keyBase = (PersistentBase) key; + for (Schema.Field field : keyBase.getSchema().getFields()) { + if (cassandraMapping.getFieldFromFieldName(field.name()) != null) { + keys.add(field.name()); + Object value = keyBase.get(field.pos()); + value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value); + values.add(value); + } else { + LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()}); + } + } + } else { + LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class}); + } + } else { + for (Field field : cassandraMapping.getInlinedDefinedPartitionKeys()) { + keys.add(field.getFieldName()); + values.add(key); + } + } + } else { + keys.add(cassandraMapping.getDefaultCassandraKey().getFieldName()); + values.add(key.toString()); + } + } + + /** + * For every field within an object, we pass in a field schema, Type and value. + * This enables us to process fields (based on their characteristics) + * preparing them for persistence. + * + * @param fieldSchema the associated field schema + * @param type the field type + * @param fieldValue the field value. + * @return + */ + static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue) { + switch (type) { + case RECORD: + PersistentBase persistent = (PersistentBase) fieldValue; + PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema()); + for (Schema.Field member : fieldSchema.getFields()) { + if (member.pos() == 0 || !persistent.isDirty()) { + continue; + } + Schema memberSchema = member.schema(); + Schema.Type memberType = memberSchema.getType(); + Object memberValue = persistent.get(member.pos()); + newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue)); + } + fieldValue = newRecord; + break; + case MAP: + Schema valueSchema = fieldSchema.getValueType(); + Schema.Type valuetype = valueSchema.getType(); + HashMap<String, Object> map = new HashMap<>(); + for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) fieldValue).entrySet()) { + String mapKey = e.getKey().toString(); + Object mapValue = e.getValue(); + mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue); + map.put(mapKey, mapValue); + } + fieldValue = map; + break; + case ARRAY: + valueSchema = fieldSchema.getElementType(); + valuetype = valueSchema.getType(); + ArrayList<Object> list = new ArrayList<>(); + for (Object item : (Collection<?>) fieldValue) { + Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item); + list.add(value); + } + fieldValue = list; + break; + case UNION: + // storing the union selected schema, the actual value will + // be stored as soon as we get break out. + if (fieldValue != null) { + int schemaPos = getUnionSchema(fieldValue, fieldSchema); + Schema unionSchema = fieldSchema.getTypes().get(schemaPos); + Schema.Type unionType = unionSchema.getType(); + fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue); + } + break; + case STRING: + fieldValue = fieldValue.toString(); + break; + default: + break; + } + return fieldValue; + } + + /** + * Given an object and the object schema this function obtains, + * from within the UNION schema, the position of the type used. + * If no data type can be inferred then we return a default value + * of position 0. + * + * @param pValue + * @param pUnionSchema + * @return the unionSchemaPosition. + */ + private static int getUnionSchema(Object pValue, Schema pUnionSchema) { + int unionSchemaPos = 0; +// String valueType = pValue.getClass().getSimpleName(); + for (Schema currentSchema : pUnionSchema.getTypes()) { + Schema.Type schemaType = currentSchema.getType(); + if (pValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) + return unionSchemaPos; + else if (pValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) + return unionSchemaPos; + else if (pValue instanceof Integer && schemaType.equals(Schema.Type.INT)) + return unionSchemaPos; + else if (pValue instanceof Long && schemaType.equals(Schema.Type.LONG)) + return unionSchemaPos; + else if (pValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) + return unionSchemaPos; + else if (pValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) + return unionSchemaPos; + else if (pValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) + return unionSchemaPos; + else if (pValue instanceof Map && schemaType.equals(Schema.Type.MAP)) + return unionSchemaPos; + else if (pValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) + return unionSchemaPos; + else if (pValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) + return unionSchemaPos; + unionSchemaPos++; + } + // if we weren't able to determine which data type it is, then we return the default + return DEFAULT_UNION_SCHEMA; + } + + static String encodeFieldKey(final String key) { + if (key == null) { + return null; + } + return key.replace(".", "\u00B7") + .replace(":", "\u00FF") + .replace(";", "\u00FE") + .replace(" ", "\u00FD") + .replace("%", "\u00FC") + .replace("=", "\u00FB"); + } + + static String decodeFieldKey(final String key) { + if (key == null) { + return null; + } + return key.replace("\u00B7", ".") + .replace("\u00FF", ":") + .replace("\u00FE", ";") + .replace("\u00FD", " ") + .replace("\u00FC", "%") + .replace("\u00FB", "="); + } + + static Object getAvroFieldValue(Object value, Schema schema) { + Object result; + switch (schema.getType()) { + + case MAP: + Map<String, Object> rawMap = (Map<String, Object>) value; + Map<Utf8, Object> deserializableMap = new HashMap<>(); + if (rawMap == null) { + result = new DirtyMapWrapper(deserializableMap); + break; + } + for (Map.Entry<?, ?> e : rawMap.entrySet()) { + Schema innerSchema = schema.getValueType(); + Object obj = getAvroFieldValue(e.getValue(), innerSchema); + if (e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) { + deserializableMap.put((Utf8) e.getKey(), obj); + } else { + deserializableMap.put(new Utf8((String) e.getKey()), obj); + } + } + result = new DirtyMapWrapper<>(deserializableMap); + break; + + case ARRAY: + List<Object> rawList = (List<Object>) value; + List<Object> deserializableList = new ArrayList<>(); + if (rawList == null) { + return new DirtyListWrapper(deserializableList); + } + for (Object item : rawList) { + Object obj = getAvroFieldValue(item, schema.getElementType()); + deserializableList.add(obj); + } + result = new DirtyListWrapper<>(deserializableList); + break; + + case RECORD: + result = (PersistentBase) value; + break; + + case UNION: + int index = getUnionSchema(value, schema); + Schema resolvedSchema = schema.getTypes().get(index); + result = getAvroFieldValue(value, resolvedSchema); + break; + + case ENUM: + result = org.apache.gora.util.AvroUtils.getEnumValue(schema, (String) value); + break; + + case BYTES: + if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = value; + } else { + result = ByteBuffer.wrap((byte[]) value); + } + break; + + case STRING: + if (value instanceof org.apache.avro.util.Utf8) { + result = value; + } else { + result = new Utf8((String) value); + } + break; + + case INT: + result = value; + break; + + default: + result = value; + } + return result; + } + +}
http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java index 3b626a4..21d548d 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java @@ -17,11 +17,13 @@ package org.apache.gora.cassandra.serializers; -import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificData; -import org.apache.gora.cassandra.query.CassandraColumn; -import org.apache.gora.cassandra.query.CassandraRow; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.query.CassandraResultSet; import org.apache.gora.cassandra.store.CassandraClient; import org.apache.gora.cassandra.store.CassandraMapping; import org.apache.gora.persistency.Persistent; @@ -29,59 +31,253 @@ import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.Query; import org.apache.gora.query.Result; import org.apache.gora.store.DataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; -import java.util.Map; /** - * This class contains the operations relates to Avro Serialization + * This class contains the operations relates to Avro Serialization. */ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { - /** - * Default schema index with value "0" used when AVRO Union data types are stored - */ - public static final int DEFAULT_UNION_SCHEMA = 0; + private static final Logger LOG = LoggerFactory.getLogger(AvroSerializer.class); + + private DataStore<K, T> cassandraDataStore; - AvroSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { - super(cassandraClient, keyClass, persistentClass, mapping); + AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) { + super(cassandraClient, dataStore.getKeyClass(), dataStore.getKeyClass(), mapping); + this.cassandraDataStore = dataStore; } @Override public Persistent get(Object key, String[] fields) { - return null; + if (fields == null) { + fields = getFields(); + } + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields, cassandraKeys); + ResultSet resultSet = this.client.getSession().execute(cqlQuery, cassandraValues.toArray()); + Iterator<Row> iterator = resultSet.iterator(); + ColumnDefinitions definitions = resultSet.getColumnDefinitions(); + T obj = null; + if (iterator.hasNext()) { + obj = cassandraDataStore.newPersistent(); + Row row = iterator.next(); + populateValuesToPersistent(row, definitions, obj); + } + return obj; } @Override - public void put(Object key, Persistent value) { - + public void put(Object key, Persistent persistent) { + if (persistent instanceof PersistentBase) { + if (persistent.isDirty()) { + PersistentBase persistentBase = (PersistentBase) persistent; + ArrayList<String> fields = new ArrayList<>(); + ArrayList<Object> values = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, fields, values); + for (Schema.Field f : persistentBase.getSchema().getFields()) { + String fieldName = f.name(); + if (mapping.getFieldFromFieldName(fieldName) == null) { + LOG.debug("Ignoring {} adding field, {} field can't find in {} mapping", new Object[]{fieldName, fieldName, persistentClass}); + continue; + } + if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKeys().contains(mapping.getFieldFromFieldName(fieldName))) { + Object value = persistentBase.get(f.pos()); + value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value); + values.add(value); + fields.add(fieldName); + } + } + String cqlQuery = CassandraQueryFactory.getInsertDataQuery(mapping, fields); + client.getSession().execute(cqlQuery, values.toArray()); + } else { + LOG.info("Ignored putting persistent bean {} in the store as it is neither " + + "new, neither dirty.", new Object[]{persistent}); + } + } else { + LOG.error("{} Persistent bean isn't extended by {} .", new Object[]{this.persistentClass, PersistentBase.class}); + } } @Override public Persistent get(Object key) { - return null; + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String cqlQuery = CassandraQueryFactory.getSelectObjectQuery(mapping, cassandraKeys); + ResultSet resultSet = this.client.getSession().execute(cqlQuery, cassandraValues.toArray()); + Iterator<Row> iterator = resultSet.iterator(); + ColumnDefinitions definitions = resultSet.getColumnDefinitions(); + T obj = null; + if (iterator.hasNext()) { + obj = cassandraDataStore.newPersistent(); + Row row = iterator.next(); + populateValuesToPersistent(row, definitions, obj); + } + return obj; } - @Override - public boolean delete(Object key) { - return false; - } + /** + * This method wraps result set data in to DataEntry and creates a list of DataEntry. + **/ + private void populateValuesToPersistent(Row row, ColumnDefinitions columnDefinitions, PersistentBase base) { + Object paramValue; + for (Schema.Field avrofield : base.getSchema().getFields()) { - @Override - public Result execute(DataStore dataStore,Query query) { - return null; + Field field = mapping.getFieldFromFieldName(avrofield.name()); + //to ignore unspecified fields in the mapping + if (field == null) { + continue; + } + Schema fieldSchema = avrofield.schema(); + String columnName = field.getColumnName(); + DataType columnType = columnDefinitions.getType(columnName); + + switch (columnType.getName()) { + case ASCII: + paramValue = row.getString(columnName); + break; + case BIGINT: + paramValue = row.isNull(columnName) ? null : row.getLong(columnName); + break; + case BLOB: + paramValue = row.isNull(columnName) ? null : row.getBytes(columnName); + break; + case BOOLEAN: + paramValue = row.isNull(columnName) ? null : row.getBool(columnName); + break; + case COUNTER: + paramValue = row.isNull(columnName) ? null : row.getLong(columnName); + break; + case DECIMAL: + paramValue = row.isNull(columnName) ? null : row.getDecimal(columnName); + break; + case DOUBLE: + paramValue = row.isNull(columnName) ? null : row.getDouble(columnName); + break; + case FLOAT: + paramValue = row.isNull(columnName) ? null : row.getFloat(columnName); + break; + case INET: + paramValue = row.isNull(columnName) ? null : row.getInet(columnName).toString(); + break; + case INT: + paramValue = row.isNull(columnName) ? null : row.getInt(columnName); + break; + case TEXT: + paramValue = row.getString(columnName); + break; + case TIMESTAMP: + paramValue = row.isNull(columnName) ? null : row.getDate(columnName); + break; + case UUID: + paramValue = row.isNull(columnName) ? null : row.getUUID(columnName); + break; + case VARCHAR: + paramValue = row.getString(columnName); + break; + case VARINT: + paramValue = row.isNull(columnName) ? null : row.getVarint(columnName); + break; + case TIMEUUID: + paramValue = row.isNull(columnName) ? null : row.getUUID(columnName); + break; + case LIST: + String dataType = field.getType(); + dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); + paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType)); + break; + case SET: + dataType = field.getType(); + dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); + paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType)); + break; + case MAP: + dataType = field.getType(); + dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); + dataType = dataType.split(",")[1]; + // Avro supports only String for keys + paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, getRelevantClassForCassandraDataType(dataType)); + break; + case UDT: + paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName).toString(); + break; + case TUPLE: + paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString(); + break; + case CUSTOM: + paramValue = row.isNull(columnName) ? null : row.getBytes(columnName); + break; + default: + paramValue = row.getString(columnName); + break; + } + Object value = AvroCassandraUtils.getAvroFieldValue(paramValue, fieldSchema); + base.put(avrofield.pos(), value); + } + } + + private Class getRelevantClassForCassandraDataType(String dataType) { + switch (dataType) { + //// TODO: 7/25/17 support all the datatypes + case "ascii": + case "text": + case "varchar": + return String.class; + case "blob": + return ByteBuffer.class; + default: + throw new RuntimeException("Invalid Cassandra DataType"); + } } @Override - public long deleteByQuery(Query query) { - return 0; + public boolean delete(Object key) { + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String cqlQuery = CassandraQueryFactory.getDeleteDataQuery(mapping, cassandraKeys); + ResultSet resultSet = this.client.getSession().execute(cqlQuery, cassandraValues.toArray()); + return resultSet.wasApplied(); } + @Override - public boolean updateByQuery(Query query) { - return false; + public Result execute(DataStore dataStore, Query query) { + List<Object> objectArrayList = new ArrayList<>(); + CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); + String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList); + ResultSet results; + if (objectArrayList.size() == 0) { + results = client.getSession().execute(cqlQuery); + } else { + results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + } + Iterator<Row> iterator = results.iterator(); + ColumnDefinitions definitions = results.getColumnDefinitions(); + T obj = null; + K keyObject = null; + long count = 0; + while (iterator.hasNext()) { + Row row = iterator.next(); + obj = cassandraDataStore.newPersistent(); + keyObject = cassandraDataStore.newKey(); + populateValuesToPersistent(row, definitions, obj); + populateValuesToPersistent(row, definitions, (PersistentBase) keyObject); + cassandraResult.addResultElement(keyObject, obj); + count ++; + } + cassandraResult.setLimit(count); + return cassandraResult; } + } http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java index ebb7c20..865a8b3 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java @@ -26,12 +26,15 @@ import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.bean.KeySpace; import org.apache.gora.cassandra.bean.PartitionKeyField; import org.apache.gora.cassandra.query.CassandraQuery; -import org.apache.gora.cassandra.query.CassandraRow; import org.apache.gora.cassandra.store.CassandraMapping; import org.apache.gora.query.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; /** @@ -39,6 +42,8 @@ import java.util.Map; */ class CassandraQueryFactory { + private static final Logger LOG = LoggerFactory.getLogger(CassandraQueryFactory.class); + /** * This method returns the CQL query to create key space. * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html @@ -83,7 +88,7 @@ class CassandraQueryFactory { * It's very much needed to follow the same order in other CRUD operations as well. * * @param mapping Cassandra mapping - * @return CQL + * @return CQL Query */ static String getCreateTableQuery(CassandraMapping mapping) { StringBuilder stringBuffer = new StringBuilder(); @@ -91,23 +96,6 @@ class CassandraQueryFactory { boolean isCommaNeeded = false; CassandraKey cassandraKey = mapping.getCassandraKey(); // appending Cassandra key columns into db schema - if (cassandraKey != null) { - for (PartitionKeyField partitionKeyField : cassandraKey.getPartitionKeyFields()) { - if (partitionKeyField.isComposite()) { - for (Field compositeField : partitionKeyField.getFields()) { - stringBuffer = processFields(stringBuffer, compositeField, isCommaNeeded); - } - - } else { - stringBuffer = processFields(stringBuffer, partitionKeyField, isCommaNeeded); - } - isCommaNeeded = true; - } - for (ClusterKeyField clusterKeyField : cassandraKey.getClusterKeyFields()) { - stringBuffer = processFields(stringBuffer, clusterKeyField, isCommaNeeded); - } - } - // appending Other columns for (Field field : mapping.getFieldList()) { if (isCommaNeeded) { stringBuffer.append(", "); @@ -151,6 +139,11 @@ class CassandraQueryFactory { } stringBuffer.append(")"); } + } else { + if (!stringBuffer.toString().toLowerCase(Locale.ENGLISH).contains("primary key")) { + Field field = mapping.getDefaultCassandraKey(); + stringBuffer.append(", ").append(field.getFieldName()).append(" ").append(field.getType()).append(" PRIMARY KEY "); + } } stringBuffer.append(")"); @@ -196,18 +189,6 @@ class CassandraQueryFactory { return stringBuffer.toString(); } - private static StringBuilder processFields(StringBuilder stringBuilder, Field field, boolean isCommaNeeded) { - if (isCommaNeeded) { - stringBuilder.append(", "); - } - stringBuilder.append(field.getColumnName()).append(" ").append(field.getType()); - boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static")); - if (isStaticColumn) { - stringBuilder.append(" STATIC"); - } - return stringBuilder; - } - /** * This method returns the CQL query to drop table. * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html @@ -246,32 +227,91 @@ class CassandraQueryFactory { * * @return */ - static String getInsertDataQuery(CassandraMapping mapping, CassandraRow row) { - String query = QueryBuilder.insertInto(mapping.getKeySpace().getName(), mapping.getCoreName()).values(row.getFields(), row.getValues()).getQueryString(); - return query; + static String getInsertDataQuery(CassandraMapping mapping, List<String> fields) { + String[] columnNames = getColumnNames(mapping, fields); + String[] objects = new String[fields.size()]; + Arrays.fill(objects, "?"); + return QueryBuilder.insertInto(mapping.getKeySpace().getName(), mapping.getCoreName()).values(columnNames, objects).getQueryString(); } - static <K> String getObjectWithFieldsQuery(CassandraMapping mapping, String[] fields, K key, List<Object> objects) { + /** + * + * @param mapping + * @param fields + * @return + */ + static String getDeleteDataQuery(CassandraMapping mapping, List<String> fields) { + String[] columnNames = getColumnNames(mapping, fields); + String[] objects = new String[fields.size()]; + Arrays.fill(objects, "?"); + Delete delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName()); + Delete.Where query = null; + boolean isWhereNeeded = true; + for (String columnName : columnNames) { + if (isWhereNeeded) { + query = delete.where(QueryBuilder.eq(columnName, "?")); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.eq(columnName, "?")); + } + } + return query.getQueryString(); + } + + static String getSelectObjectQuery(CassandraMapping mapping, List<String> keyFields) { + Select select = QueryBuilder.select().from(mapping.getKeySpace().getName(), mapping.getCoreName()); + if(Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { + select.allowFiltering(); + } + String[] columnNames = getColumnNames(mapping, keyFields); + Select.Where query = null; + boolean isWhereNeeded = true; + for (String columnName : columnNames) { + if (isWhereNeeded) { + query = select.where(QueryBuilder.eq(columnName, "?")); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.eq(columnName, "?")); + } + } + return query.getQueryString(); + } + + static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, String[] fields, List<String> keyFields) { + Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + if(Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { + select.allowFiltering(); + } + String[] columnNames = getColumnNames(mapping, keyFields); + Select.Where query = null; + boolean isWhereNeeded = true; + for (String columnName : columnNames) { + if (isWhereNeeded) { + query = select.where(QueryBuilder.eq(columnName, "?")); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.eq(columnName, "?")); + } + } + return query.getQueryString(); + } + + static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, String[] fields) { String cqlQuery = null; - Select select = QueryBuilder.select(fields).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + String[] columnNames = getColumnNames(mapping, Arrays.asList(fields)); + Select select = QueryBuilder.select(columnNames).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + if(Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { + select.allowFiltering(); + } CassandraKey cKey = mapping.getCassandraKey(); if (cKey != null) { Select.Where query = null; boolean isWhereNeeded = true; - for (PartitionKeyField field : cKey.getPartitionKeyFields()) { - if (field.isComposite()) { - for (Field compositeField : field.getFields()) { - if (isWhereNeeded) { - query = select.where(QueryBuilder.eq(compositeField.getColumnName(), "?")); - isWhereNeeded = false; - } - query = query.and(QueryBuilder.eq(compositeField.getColumnName(), "?")); - } + for (Field field : cKey.getFieldList()) { + if (isWhereNeeded) { + query = select.where(QueryBuilder.eq(field.getColumnName(), "?")); + isWhereNeeded = false; } else { - if (isWhereNeeded) { - query = select.where(QueryBuilder.eq(field.getColumnName(), "?")); - isWhereNeeded = false; - } query = query.and(QueryBuilder.eq(field.getColumnName(), "?")); } } @@ -281,7 +321,6 @@ class CassandraQueryFactory { boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); if (isPrimaryKey) { cqlQuery = select.where(QueryBuilder.eq(field.getColumnName(), "?")).getQueryString(); - objects.add(key); break; } } @@ -298,20 +337,53 @@ class CassandraQueryFactory { Object key = cassandraQuery.getKey(); String primaryKey = null; long limit = cassandraQuery.getLimit(); - Select select = QueryBuilder.select(getColumnNames(mapping, fields)).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName()); if (limit > 0) { select = select.limit((int) limit); } + if(Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { + select.allowFiltering(); + } Select.Where query = null; boolean isWhereNeeded = true; if (key != null) { - primaryKey = getPKey(mapping.getFieldList()); - query = select.where(QueryBuilder.eq(primaryKey, "?")); - objects.add(key); + if (mapping.getCassandraKey() != null) { + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + query = select.where(QueryBuilder.eq(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.eq(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + } + } + } else { + primaryKey = getPKey(mapping.getFieldList()); + query = select.where(QueryBuilder.eq(primaryKey, "?")); + objects.add(key); + } } else { if (startKey != null) { if (mapping.getCassandraKey() != null) { -//todo avro serialization + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + query = select.where(QueryBuilder.gte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.gte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + } + } } else { primaryKey = getPKey(mapping.getFieldList()); query = select.where(QueryBuilder.gte(primaryKey, "?")); @@ -321,7 +393,20 @@ class CassandraQueryFactory { } if (endKey != null) { if (mapping.getCassandraKey() != null) { -//todo avro serialization + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + query = select.where(QueryBuilder.lte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.lte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + } + } } else { primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); if (isWhereNeeded) { @@ -339,14 +424,21 @@ class CassandraQueryFactory { return query.getQueryString(); } - private static String[] getColumnNames(CassandraMapping mapping, String[] fields) { - String[] columnNames = new String[fields.length]; - int i = 0; + private static String[] getColumnNames(CassandraMapping mapping, List<String> fields) { + ArrayList<String> columnNames = new ArrayList<>(); for (String field : fields) { - columnNames[i] = mapping.getField(field).getColumnName(); - i++; + Field fieldBean = mapping.getFieldFromFieldName(field); + if (fieldBean != null) { + columnNames.add(fieldBean.getColumnName()); + } else { + if (mapping.getDefaultCassandraKey().getFieldName().equals(field)) { + columnNames.add(field); + } else { + LOG.warn("{} field is ignored, couldn't find relavant field in the persistent mapping", field); + } + } } - return columnNames; + return columnNames.toArray(new String[0]); } private static String getPKey(List<Field> fields) { @@ -362,7 +454,7 @@ class CassandraQueryFactory { static String getDeleteByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { String[] columns = null; if (!Arrays.equals(cassandraQuery.getFields(), mapping.getFieldNames())) { - columns = getColumnNames(mapping, cassandraQuery.getFields()); + columns = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); } Object startKey = cassandraQuery.getStartKey(); Object endKey = cassandraQuery.getEndKey(); @@ -377,13 +469,43 @@ class CassandraQueryFactory { Delete.Where query = null; boolean isWhereNeeded = true; if (key != null) { - primaryKey = getPKey(mapping.getFieldList()); - query = delete.where(QueryBuilder.eq(primaryKey, "?")); - objects.add(key); + if (mapping.getCassandraKey() != null) { + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + query = delete.where(QueryBuilder.eq(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.eq(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + } + } + } else { + primaryKey = getPKey(mapping.getFieldList()); + query = delete.where(QueryBuilder.eq(primaryKey, "?")); + objects.add(key); + } } else { if (startKey != null) { if (mapping.getCassandraKey() != null) { -//todo avro serialization + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + query = delete.where(QueryBuilder.gte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.gte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + } + } } else { primaryKey = getPKey(mapping.getFieldList()); query = delete.where(QueryBuilder.gte(primaryKey, "?")); @@ -393,7 +515,20 @@ class CassandraQueryFactory { } if (endKey != null) { if (mapping.getCassandraKey() != null) { -//todo avro serialization + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + query = delete.where(QueryBuilder.lte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.lte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + } + } } else { primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); if (isWhereNeeded) { @@ -415,8 +550,7 @@ class CassandraQueryFactory { Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName()); Update.Assignments updateAssignments = null; if (cassandraQuery instanceof CassandraQuery) { - String[] fields = cassandraQuery.getFields(); - String[] columnNames = getColumnNames(mapping, fields); + String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); for (String column : columnNames) { updateAssignments = update.with(QueryBuilder.set(column, "?")); objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(column)); @@ -429,13 +563,43 @@ class CassandraQueryFactory { Object key = cassandraQuery.getKey(); boolean isWhereNeeded = true; if (key != null) { - primaryKey = getPKey(mapping.getFieldList()); - query = updateAssignments.where(QueryBuilder.eq(primaryKey, "?")); - objects.add(key); + if (mapping.getCassandraKey() != null) { + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + query = updateAssignments.where(QueryBuilder.eq(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.eq(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + } + } + } else { + primaryKey = getPKey(mapping.getFieldList()); + query = updateAssignments.where(QueryBuilder.eq(primaryKey, "?")); + objects.add(key); + } } else { if (startKey != null) { if (mapping.getCassandraKey() != null) { -//todo avro serialization + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + query = updateAssignments.where(QueryBuilder.gte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.gte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + } + } } else { primaryKey = getPKey(mapping.getFieldList()); query = updateAssignments.where(QueryBuilder.gte(primaryKey, "?")); @@ -445,7 +609,20 @@ class CassandraQueryFactory { } if (endKey != null) { if (mapping.getCassandraKey() != null) { -//todo avro serialization + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + query = updateAssignments.where(QueryBuilder.lte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.lte(columnKeys[i], "?")); + objects.add(cassandraValues.get(i)); + } + } } else { primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); if (isWhereNeeded) { http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java index 237bd8a..a924a31 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java @@ -18,7 +18,9 @@ package org.apache.gora.cassandra.serializers; import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.TableMetadata; +import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.store.CassandraClient; import org.apache.gora.cassandra.store.CassandraMapping; import org.apache.gora.cassandra.store.CassandraStore; @@ -29,6 +31,8 @@ import org.apache.gora.store.DataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; /** @@ -90,27 +94,34 @@ public abstract class CassandraSerializer<K, T extends Persistent> { * * @param cc Cassandra Client * @param type Serialization type - * @param keyClass key class - * @param persistentClass persistent class + * @param dataStore Cassandra DataStore * @param mapping Cassandra Mapping * @param <K> key class * @param <T> persistent class * @return Serializer */ - public static <K, T> CassandraSerializer getSerializer(CassandraClient cc, String type, final Class<K> keyClass, final Class<T> persistentClass, CassandraMapping mapping) { + public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K,T> dataStore, CassandraMapping mapping) { CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH)); CassandraSerializer serializer; switch (serType) { case AVRO: - serializer = new AvroSerializer(cc, keyClass, persistentClass, mapping); + serializer = new AvroSerializer(cc, dataStore, mapping); break; case NATIVE: default: - serializer = new NativeSerializer(cc, keyClass, persistentClass, mapping); + serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping); } return serializer; } + protected String[] getFields() { + List<String> fields = new ArrayList<>(); + for (Field field : mapping.getFieldList()) { + fields.add(field.getFieldName()); + } + return fields.toArray(new String[0]); + } + public abstract void put(K key, T value); public abstract T get(K key); @@ -121,8 +132,30 @@ public abstract class CassandraSerializer<K, T extends Persistent> { public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query); - public abstract long deleteByQuery(Query<K, T> query); + public boolean updateByQuery(Query query) { + List<Object> objectArrayList = new ArrayList<>(); + String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, objectArrayList); + ResultSet results; + if (objectArrayList.size() == 0) { + results = client.getSession().execute(cqlQuery); + } else { + results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + } + return results.wasApplied(); + } - public abstract boolean updateByQuery(Query<K, T> query); + public long deleteByQuery(Query query) { + List<Object> objectArrayList = new ArrayList<>(); + String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList); + ResultSet results; + if (objectArrayList.size() == 0) { + results = client.getSession().execute(cqlQuery); + } else { + results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + } + LOG.debug("Delete by Query was applied : " + results.wasApplied()); + LOG.info("Delete By Query method doesn't return the deleted element count."); + return 0; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java index 6f64fa2..5f36ce2 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java @@ -42,7 +42,7 @@ import java.util.List; */ class NativeSerializer<K, T extends CassandraNativePersistent> extends CassandraSerializer { - private static final Logger LOG = LoggerFactory.getLogger(CassandraNativePersistent.class); + private static final Logger LOG = LoggerFactory.getLogger(NativeSerializer.class); private Mapper<T> mapper; @@ -72,9 +72,11 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra @Override public Persistent get(Object key, String[] fields) { - List<Object> objectArrayList = new ArrayList<>(); - String cqlQuery = CassandraQueryFactory.getObjectWithFieldsQuery(mapping, fields, key, objectArrayList); - ResultSet results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + if(fields == null) { + fields = getFields(); + } + String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields); + ResultSet results = client.getSession().execute(cqlQuery, key); Result<T> objects = mapper.map(results); List<T> objectList = objects.all(); if (objectList != null) { @@ -86,24 +88,9 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra } @Override - public long deleteByQuery(Query query) { - List<Object> objectArrayList = new ArrayList<>(); - String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList); - ResultSet results; - if (objectArrayList.size() == 0) { - results = client.getSession().execute(cqlQuery); - } else { - results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); - } - LOG.debug("Delete by Query was applied : " + results.wasApplied()); - LOG.info("Delete By Query method doesn't return the deleted element count."); - return 0; - } - - @Override public org.apache.gora.query.Result execute(DataStore dataStore, Query query) { List<Object> objectArrayList = new ArrayList<>(); - CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<K, T>(dataStore, query); + CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList); ResultSet results; if (objectArrayList.size() == 0) { @@ -113,11 +100,14 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra } Result<T> objects = mapper.map(results); Iterator iterator = objects.iterator(); + long count = 0; while (iterator.hasNext()) { T result = (T) iterator.next(); K key = getKey(result); cassandraResult.addResultElement(key, result); + count ++ ; } + cassandraResult.setLimit(count); return cassandraResult; } @@ -128,19 +118,6 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra mapper = mappingManager.mapper(persistentClass); } - @Override - public boolean updateByQuery(Query query) { - List<Object> objectArrayList = new ArrayList<>(); - String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, objectArrayList); - ResultSet results; - if (objectArrayList.size() == 0) { - results = client.getSession().execute(cqlQuery); - } else { - results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); - } - return results.wasApplied(); - } - private K getKey(T object) { String keyField = null; for (Field field : mapping.getFieldList()) { http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java index 7b5d265..61b8d1e 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java @@ -35,10 +35,18 @@ public class CassandraMapping { private Map<String, String> tableProperties; + private Class keyClass; + + private Class persistentClass; + private KeySpace keySpace; private List<Field> fieldList; + private List<Field> inlinedDefinedPartitionKeys; + + private static final String PRIMARY_KEY = "primarykey"; + private String coreName; public KeySpace getKeySpace() { @@ -53,9 +61,9 @@ public class CassandraMapping { return fieldList; } - public Field getField(String field) { + public Field getFieldFromFieldName(String fieldName) { for (Field field1 : fieldList) { - if (field1.getFieldName().equals(field)) { + if (field1.getFieldName().equals(fieldName)) { return field1; } } @@ -75,11 +83,12 @@ public class CassandraMapping { return cassandraKey; } - public void setCassandraKey(CassandraKey cassandraKey) { + void setCassandraKey(CassandraKey cassandraKey) { this.cassandraKey = cassandraKey; + this.fieldList.addAll(cassandraKey.getFieldList()); } - public CassandraMapping() { + CassandraMapping() { this.fieldList = new ArrayList<>(); this.tableProperties = new HashMap<>(); } @@ -103,4 +112,54 @@ public class CassandraMapping { public String getProperty(String key) { return this.tableProperties.get(key); } + + public Field getDefaultCassandraKey() { + Field field = new Field(); + field.setFieldName("defaultId"); + field.setColumnName("defaultId"); + field.setType("text"); + return field; + } + + public boolean isPartitionKeyDefined() { + if (cassandraKey == null) { + for (Field field : fieldList) { + if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) { + return true; + } + } + return false; + } + return true; + } + + public Class getKeyClass() { + return keyClass; + } + + public void setKeyClass(Class keyClass) { + this.keyClass = keyClass; + } + + public Class getPersistentClass() { + return persistentClass; + } + + public void setPersistentClass(Class persistentClass) { + this.persistentClass = persistentClass; + } + + public List<Field> getInlinedDefinedPartitionKeys() { + if(inlinedDefinedPartitionKeys != null) { + return inlinedDefinedPartitionKeys; + } else { + inlinedDefinedPartitionKeys = new ArrayList<>(); + for (Field field : fieldList) { + if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) { + inlinedDefinedPartitionKeys.add(field); + } + } + return inlinedDefinedPartitionKeys; + } + } } http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java index 0231ac3..d828d2f 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java @@ -46,7 +46,7 @@ public class CassandraMappingBuilder<K, T extends Persistent> { */ @SuppressWarnings("all") public CassandraMapping readMapping(String filename) throws IOException { - CassandraMapping map = new CassandraMapping(); + CassandraMapping cassandraMapping = new CassandraMapping(); Class keyClass = dataStore.getKeyClass(); Class persistentClass = dataStore. getPersistentClass(); try { @@ -66,14 +66,16 @@ public class CassandraMappingBuilder<K, T extends Persistent> { classMatched = true; String tableName = classElement.getAttributeValue("table"); - map.setCoreName(tableName); + cassandraMapping.setCoreName(tableName); + cassandraMapping.setKeyClass(dataStore.getKeyClass()); + cassandraMapping.setPersistentClass(dataStore.getPersistentClass()); List classAttributes = classElement.getAttributes(); for (Object anAttributeList : classAttributes) { Attribute attribute = (Attribute) anAttributeList; String attributeName = attribute.getName(); String attributeValue = attribute.getValue(); - map.addProperty(attributeName, attributeValue); + cassandraMapping.addProperty(attributeName, attributeValue); } List<Element> fields = classElement.getChildren("field"); @@ -83,7 +85,7 @@ public class CassandraMappingBuilder<K, T extends Persistent> { List fieldAttributes = field.getAttributes(); processAttributes(fieldAttributes, cassandraField); - map.addCassandraField(cassandraField); + cassandraMapping.addCassandraField(cassandraField); } break; } @@ -96,7 +98,7 @@ public class CassandraMappingBuilder<K, T extends Persistent> { LOG.error("Check that 'keyClass' and 'name' parameters in {} no mapping has been initialized for {} class mapping", filename, persistentClass); } - String keyspaceName = map.getProperty("keyspace"); + String keyspaceName = cassandraMapping.getProperty("keyspace"); if (keyspaceName != null) { KeySpace keyspace; for (Element keyspaceElement : keyspaces) { @@ -135,13 +137,16 @@ public class CassandraMappingBuilder<K, T extends Persistent> { } break; } - map.setKeySpace(keyspace); + cassandraMapping.setKeySpace(keyspace); break; } } } + else { + throw new RuntimeException("KeySpace couldn't be able to found in the cassandra mapping. Please configure the cassandra mapping correctly."); + } for (Element key : keys) { if (keyClass.getName().equals(key.getAttributeValue("name"))) { @@ -172,7 +177,7 @@ public class CassandraMappingBuilder<K, T extends Persistent> { } //process cluster keys - List<Element> clusterKeyFields = clusterKeys.getChildren("field"); + List<Element> clusterKeyFields = clusterKeys.getChildren("key"); for (Element clusterKeyField : clusterKeyFields) { ClusterKeyField keyField = new ClusterKeyField(); List fieldAttributes = clusterKeyField.getAttributes(); @@ -181,32 +186,25 @@ public class CassandraMappingBuilder<K, T extends Persistent> { String attributeName = attribute.getName(); String attributeValue = attribute.getValue(); switch (attributeName) { - case "name": - keyField.setFieldName(attributeValue); - break; case "column": keyField.setColumnName(attributeValue); break; - case "type": - keyField.setType(attributeValue.replace("(","<").replace(")",">")); - break; case "order": keyField.setOrder(ClusterKeyField.Order.valueOf(attributeValue.toUpperCase(Locale.ENGLISH))); break; default: - keyField.addProperty(attributeName, attributeValue); - break; + throw new RuntimeException(""); } } cassandraKey.addClusterKeyField(keyField); } - map.setCassandraKey(cassandraKey); + cassandraMapping.setCassandraKey(cassandraKey); } } } catch (Exception ex) { throw new IOException(ex); } - return map; + return cassandraMapping; } private void processAttributes(List<Element> attributes, Field fieldKey) { http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java index 5e209d9..8b04442 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java @@ -90,7 +90,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> mapping = mappingBuilder.readMapping(mappingFile); CassandraClient cassandraClient = new CassandraClient(); cassandraClient.initialize(properties, mapping); - cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), keyClass, persistentClass, mapping); + cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), this, mapping); } catch (Exception e) { throw new RuntimeException("Error while initializing Cassandra store: " + e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml index 8c970dc..74e4b1c 100644 --- a/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml +++ b/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml @@ -17,38 +17,6 @@ limitations under the License. --> -<!-- - The value of 'host' attribute of keyspace tag should match exactly what is in - gora.properties file. Essentially this means that if you are using port number, you should - use it every where regardless of whether it is the default port or not. - At runtime Gora will otherwise try to connect to localhost - https://issues.apache.org/jira/browse/GORA-269 - - The values of 'replication_factor' and 'placement_strategy' attribute of keyspace tag - only apply if gora create the kyespace. they have no effect if this is being used against - an existing keyspace. the default value for 'replication_factor' is '1' - - The value of 'placement_strategy' should be a fully qualifed class name that is known to - the cassansra cluster, not the application or gora. As of this writing, the classes that ship - with cassandra are: - 'org.apache.cassandra.locator.SimpleStrategy' - 'org.apache.cassandra.locator.NetworkTopologyStrategy' - gora cassandra would use SimpleStrategy by default if no value for this attribute is specified - - The default value of 'gc_grace_seconds' is '0' which is ONLY VIABLE FOR SINGLE NODE - CLUSTER. you should update this value according to your cluster configuration. - https://wiki.apache.org/cassandra/StorageConfiguration - - The value of 'ttl' (time to live) attribute of field tag should most likely always - be zero unless you want Cassandra to create Tombstones and delete portions of your - data once this period expires. Any positive value is read and bound to the number - of seconds after which the value for that field will disappear. The default value of ttl - is '0' - - More information on gora-cassandra configuration and mapping's can be found - at http://gora.apache.org/current/gora-cassandra.html ---> - <gora-otd> <keyspace name="avroKeySpace" durableWrite="false"> @@ -56,18 +24,21 @@ </keyspace> <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage" keyspace="avroKeySpace"> - <field name="url" column="url" length="128" primarykey="true"/> + <field name="url" column="url" type="ascii" /> <field name="content" column="content" type="blob"/> - <field name="parsedContent" column="parsedContent" type="list"/> - <field name="outlinks" column="outlinks" type="map"/> + <field name="parsedContent" column="parsedContent" type="list(ascii)"/> + <field name="outlinks" column="outlinks" type="map(text,text)"/> + <field name="headers" column="headers" type="map(text,text)"/> + <field name="byteData" column="byteData" type="map(text,blob)"/> + <field name="stringData" column="stringData" type="map(text,ascii)"/> </class> <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" keyspace="avroKeySpace" table="Employee" compactStorage="true" > - <field name="name" column="name" type="text" ttl="10"/> - <field name="dateOfBirth" column="dob" type="bigint" ttl="10"/> - <field name="ssn" column="ssn" type="text" ttl="10" primarykey="true"/> - <field name="salary" column="salary" type="int" ttl="10" /> + <field name="name" column="name" type="text"/> + <field name="dateOfBirth" column="dob" type="bigint" /> + <field name="ssn" column="ssn" type="text" /> + <field name="salary" column="salary" type="int" /> </class> </gora-otd> http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml index 556d553..f3b3272 100644 --- a/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml +++ b/gora-cassandra-cql/src/test/conf/compositeKey/gora-cassandra-mapping.xml @@ -55,23 +55,30 @@ <placementStrategy name="SimpleStrategy" replication_factor="1"/> </keyspace> - <class name="org.apache.gora.examples.generated.Employee" keyClass="org.apache.gora.examples.generated.WebPage" keyspace="EmployeeSpace" - table="Employee" compactStorage="true" id="31323131"> - <field name="name" column="name" type="text" ttl="10"/> - <field name="dateOfBirth" column="dob" type="timestamp" ttl="10"/> + <class name="org.apache.gora.cassandra.example.generated.avroSerialization.CassandraRecord" keyClass="org.apache.gora.cassandra.example.generated.avroSerialization.CassandraKey" keyspace="EmployeeSpace" + table="CassandraRecord" allowFiltering="true" id="5a1c395e-b41f-11e5-9f22-ba0be0483c18"> + <field name="dataString" column="name" type="text"/> + <field name="dataInt" column="age" type="int"/> + <field name="dataLong" column="salary" type="bigint"/> + <field name="dataDouble" column="testDouble" type="double"/> + <field name="dataBytes" column="quotes" type="blob"/> + <field name="arrayInt" column="listInt" type="list(int)"/> + <field name="arrayString" column="listString" type="list(text)"/> + <field name="arrayLong" column="listLong" type="list(bigint)"/> + <field name="arrayDouble" column="listDouble" type="list(double)"/> + <field name="mapInt" column="mapInt" type="map(text,int)"/> + <field name="mapString" column="mapString" type="map(text,text)"/> + <field name="mapLong" column="mapLong" type="map(text,bigint)"/> + <field name="mapDouble" column="mapDouble" type="map(text,double)"/> </class> - <cassandraKey name="org.apache.gora.examples.generated.WebPage"> + <cassandraKey name="org.apache.gora.cassandra.example.generated.avroSerialization.CassandraKey"> <partitionKey> - <compositeKey> - <field name="city" column="city" type="text"/> - <field name="country" column="country" type="text"/> - </compositeKey> - <field name="employerId" column="empID" type="int"/> - <field name="departmentId" column="deptID" type="int"/> + <field name="url" column="urlData" type="text"/> + <field name="timestamp" column="timestampData" type="bigint"/> </partitionKey> <clusterKey> - <field name="joinDate" column="joinDate" type="timestamp" order="desc"/> + <key column="timestampData" order="desc"/> </clusterKey> </cassandraKey> http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/test/conf/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/conf/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/gora-cassandra-mapping.xml index 5f59ba8..90f5267 100644 --- a/gora-cassandra-cql/src/test/conf/gora-cassandra-mapping.xml +++ b/gora-cassandra-cql/src/test/conf/gora-cassandra-mapping.xml @@ -62,7 +62,7 @@ </placementStrategy> </keyspace> - <class name="org.apache.gora.examples.generated.Employee1" keyClass="java.lang.String" keyspace="EmployeeSpace" + <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" keyspace="EmployeeSpace" table="Employee" compactStorage="true" id="31323131"> <field name="lname" column="name" type="text" ttl="10" static="true"/> <field name="fname" column="name" type="text" ttl="10"/> http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java new file mode 100644 index 0000000..82f6f58 --- /dev/null +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithCassandraKey.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.store; + +import org.apache.avro.util.Utf8; +import org.apache.gora.cassandra.GoraCassandraTestDriver; +import org.apache.gora.cassandra.example.generated.avroSerialization.CassandraKey; +import org.apache.gora.cassandra.example.generated.avroSerialization.CassandraRecord; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.DataStore; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Properties; + +/** + * This class tests Cassandra Store functionality with CassandraKey. + */ +public class TestCassandraStoreWithCassandraKey { + private static GoraCassandraTestDriver testDriver = new GoraCassandraTestDriver(); + private static DataStore<CassandraKey, CassandraRecord> cassandraRecordDataStore; + private static Properties parameter; + + @BeforeClass + public static void setUpClass() throws Exception { + setProperties(); + testDriver.setParameters(parameter); + testDriver.setUpClass(); + cassandraRecordDataStore = testDriver.createDataStore(CassandraKey.class, CassandraRecord.class); + } + + private static void setProperties() { + parameter = new Properties(); + parameter.setProperty(CassandraStoreParameters.CASSANDRA_SERVERS, "localhost"); + parameter.setProperty(CassandraStoreParameters.PORT, "9042"); + parameter.setProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE, "avro"); + parameter.setProperty(CassandraStoreParameters.PROTOCOL_VERSION, "3"); + parameter.setProperty(CassandraStoreParameters.CLUSTER_NAME, "Test Cluster"); + parameter.setProperty("gora.cassandrastore.mapping.file", "compositeKey/gora-cassandra-mapping.xml"); + } + + @After + public void tearDown() throws Exception { + testDriver.tearDown(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + testDriver.tearDownClass(); + } + + + /** + * In this test case, schema exists method behavior of the data store is testing. + */ + @Test + public void testSchemaRelatedBehaviour() { + cassandraRecordDataStore.createSchema(); + Assert.assertTrue(cassandraRecordDataStore.schemaExists()); + cassandraRecordDataStore.deleteSchema(); + Assert.assertFalse(cassandraRecordDataStore.schemaExists()); + cassandraRecordDataStore.createSchema(); + Assert.assertTrue(cassandraRecordDataStore.schemaExists()); + } + + @Test + public void testSimplePutGet() { + cassandraRecordDataStore.createSchema(); + CassandraRecord record = new CassandraRecord(); + record.setDataLong(719411002L); + record.setDataString(new Utf8("M.K.H. Gunasekara")); + record.setDataInt(144); + record.setDataBytes(ByteBuffer.wrap("No 144, Gunasekara Mawatha, Mattumgala, Ragama".getBytes(Charset.defaultCharset()))); + record.setDataDouble(3.14159d); + CassandraKey key = new CassandraKey(); + key.setTimestamp(2027L); + key.setUrl("www.apache.org"); + cassandraRecordDataStore.put(key, record); + CassandraRecord retrievedRecord = cassandraRecordDataStore.get(key); + Assert.assertEquals(record.getDataInt(), retrievedRecord.getDataInt()); + Assert.assertEquals(record.getDataString(), retrievedRecord.getDataString()); + Assert.assertEquals(record.getDataLong(), retrievedRecord.getDataLong()); + Assert.assertEquals(record.getDataBytes(), retrievedRecord.getDataBytes()); + Assert.assertEquals(record.getDataDouble(), retrievedRecord.getDataDouble()); + cassandraRecordDataStore.delete(key); + Assert.assertNull(cassandraRecordDataStore.get(key)); + } + + @Test + public void testExecuteQuery() throws Exception { + Query query = cassandraRecordDataStore.newQuery(); + cassandraRecordDataStore.truncateSchema(); + CassandraKey key = new CassandraKey(); + key.setTimestamp(2027L); + key.setUrl("www.apache.org"); + query.setKey(key); + Result result = query.execute(); + Assert.assertFalse(result.next()); + CassandraRecord record = new CassandraRecord(); + record.setDataLong(719411002L); + record.setDataString(new Utf8("M.K.H. Gunasekara")); + record.setDataInt(144); + record.setDataBytes(ByteBuffer.wrap("No 144, Gunasekara Mawatha, Mattumgala, Ragama".getBytes(Charset.defaultCharset()))); + record.setDataDouble(3.14159d); + // test simple put and query with setKey + cassandraRecordDataStore.put(key, record); + CassandraRecord retrievedRecord = cassandraRecordDataStore.get(key); + Assert.assertEquals(record.getDataInt(), retrievedRecord.getDataInt()); + Assert.assertEquals(record.getDataString(), retrievedRecord.getDataString()); + Assert.assertEquals(record.getDataLong(), retrievedRecord.getDataLong()); + Assert.assertEquals(record.getDataBytes(), retrievedRecord.getDataBytes()); + Assert.assertEquals(record.getDataDouble(), retrievedRecord.getDataDouble()); + result = query.execute(); + Assert.assertTrue(result.next()); + // verify data + retrievedRecord = (CassandraRecord) result.get(); + Assert.assertEquals(record.getDataInt(), retrievedRecord.getDataInt()); + Assert.assertEquals(record.getDataString(), retrievedRecord.getDataString()); + Assert.assertEquals(record.getDataLong(), retrievedRecord.getDataLong()); + Assert.assertEquals(record.getDataBytes(), retrievedRecord.getDataBytes()); + Assert.assertEquals(record.getDataDouble(), retrievedRecord.getDataDouble()); + // test delete by query + cassandraRecordDataStore.deleteByQuery(query); + result = query.execute(); + Assert.assertFalse(result.next()); + // test empty query + Query emptyQuery = cassandraRecordDataStore.newQuery(); + result = emptyQuery.execute(); + Assert.assertFalse(result.next()); + cassandraRecordDataStore.put(key, record); + result = query.execute(); + Assert.assertTrue(result.next()); + + // test Range with Query + cassandraRecordDataStore.truncateSchema(); + //insert data + CassandraRecord record1 = new CassandraRecord(); + CassandraRecord record2 = new CassandraRecord(); + CassandraRecord record3 = new CassandraRecord(); + CassandraRecord record4 = new CassandraRecord(); + record1.setDataLong(719411002L); + record1.setDataString(new Utf8("Madawa")); + record1.setDataInt(100); + record2.setDataLong(712778588L); + record2.setDataString(new Utf8("Kasun")); + record2.setDataInt(101); + record3.setDataLong(716069539L); + record3.setDataString(new Utf8("Charith")); + record3.setDataInt(102); + record4.setDataLong(112956051L); + record4.setDataString(new Utf8("Bhanuka")); + record4.setDataInt(103); + CassandraKey key1 = new CassandraKey(); + key1.setTimestamp(200L); + key1.setUrl("www.apache.org"); + CassandraKey key2 = new CassandraKey(); + key2.setTimestamp(205L); + key2.setUrl("www.apache.org"); + CassandraKey key3 = new CassandraKey(); + key3.setTimestamp(210L); + key3.setUrl("www.apache.org"); + CassandraKey key4 = new CassandraKey(); + key4.setTimestamp(215L); + key4.setUrl("www.apache.org"); + cassandraRecordDataStore.put(key1,record1); + cassandraRecordDataStore.put(key2,record2); + cassandraRecordDataStore.put(key3,record3); + cassandraRecordDataStore.put(key4,record4); + Query rangeQuery = cassandraRecordDataStore.newQuery(); + rangeQuery.setStartKey(key2); + rangeQuery.setEndKey(key2); + result = rangeQuery.execute(); + int i = 0; + while (result.next()) { + i++; + } + Assert.assertEquals(1,i); + + rangeQuery.setStartKey(key2); + rangeQuery.setEndKey(key3); + result = rangeQuery.execute(); + i = 0; + while (result.next()) { + i++; + } + Assert.assertEquals(2,i); + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/a51b719c/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java index 88d6267..f3df5e4 100644 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java @@ -81,7 +81,7 @@ public class TestCassandraStoreWithNativeSerialization { * In this test case, put and get behavior of the data store are testing. */ @Test - public void testSimplePutandGet() { + public void testSimplePutAndGet() { UUID id = UUID.randomUUID(); User user1 = new User(id, "madhawa", Date.from(Instant.now())); // storing data; @@ -96,7 +96,7 @@ public class TestCassandraStoreWithNativeSerialization { * In this test case, put and delete behavior of the data store are testing. */ @Test - public void testSimplePutDeleteandGet() { + public void testSimplePutDeleteAndGet() { UUID id = UUID.randomUUID(); User user1 = new User(id, "kasun", Date.from(Instant.now())); // storing data;
