http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java new file mode 100644 index 0000000..928370c --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java @@ -0,0 +1,836 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.serializers; + +import com.datastax.driver.core.querybuilder.BuiltStatement; +import com.datastax.driver.core.querybuilder.Delete; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; +import com.datastax.driver.core.querybuilder.Update; +import com.datastax.driver.mapping.annotations.UDT; +import org.apache.avro.Schema; +import org.apache.gora.cassandra.bean.CassandraKey; +import org.apache.gora.cassandra.bean.ClusterKeyField; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.bean.KeySpace; +import org.apache.gora.cassandra.bean.PartitionKeyField; +import org.apache.gora.cassandra.query.CassandraQuery; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.cassandra.store.CassandraStore; +import org.apache.gora.query.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * This class is used create Cassandra Queries. + */ +class CassandraQueryFactory { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraQueryFactory.class); + + /** + * This method returns the CQL query to create key space. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @return CQL Query + */ + static String getCreateKeySpaceQuery(CassandraMapping mapping) { + KeySpace keySpace = mapping.getKeySpace(); + StringBuilder stringBuffer = new StringBuilder(); + stringBuffer.append("CREATE KEYSPACE IF NOT EXISTS ").append(keySpace.getName()).append(" WITH REPLICATION = { 'class' : "); + KeySpace.PlacementStrategy placementStrategy = keySpace.getPlacementStrategy(); + stringBuffer.append("'").append(placementStrategy).append("'").append(", ").append("'"); + switch (placementStrategy) { + case SimpleStrategy: + stringBuffer.append("replication_factor").append("'").append(" : ").append(keySpace.getReplicationFactor()).append(" }"); + break; + case NetworkTopologyStrategy: + boolean isCommaNeeded = false; + for (Map.Entry<String, Integer> entry : keySpace.getDataCenters().entrySet()) { + if (isCommaNeeded) { + stringBuffer.append(", '"); + } + stringBuffer.append(entry.getKey()).append("'").append(" : ").append(entry.getValue()); + isCommaNeeded = true; + } + stringBuffer.append(" }"); + break; + } + if (keySpace.isDurableWritesEnabled()) { + stringBuffer.append(" AND DURABLE_WRITES = ").append(keySpace.isDurableWritesEnabled()); + } + return stringBuffer.toString(); + } + + /** + * This method returns the CQL query to table. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_table_r.html + * <p> + * Trick : To have a consistency of the order of the columns, first we append partition keys, second cluster keys and finally other columns. + * It's very much needed to follow the same order in other CRUD operations as well. + * + * @param mapping Cassandra mapping {@link CassandraMapping} + * @return CQL Query + */ + static String getCreateTableQuery(CassandraMapping mapping) { + StringBuilder stringBuffer = new StringBuilder(); + stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" ("); + CassandraKey cassandraKey = mapping.getCassandraKey(); + // appending Cassandra Persistent columns into db schema + processFieldsForCreateTableQuery(mapping.getFieldList(), false, stringBuffer); + + if (cassandraKey != null) { + processFieldsForCreateTableQuery(cassandraKey.getFieldList(), true, stringBuffer); + List<PartitionKeyField> partitionKeys = cassandraKey.getPartitionKeyFields(); + if (partitionKeys != null) { + stringBuffer.append(", PRIMARY KEY ("); + boolean isCommaNeededToApply = false; + for (PartitionKeyField keyField : partitionKeys) { + if (isCommaNeededToApply) { + stringBuffer.append(","); + } + if (keyField.isComposite()) { + stringBuffer.append("("); + boolean isCommaNeededHere = false; + for (Field field : keyField.getFields()) { + if (isCommaNeededHere) { + stringBuffer.append(", "); + } + stringBuffer.append(field.getColumnName()); + isCommaNeededHere = true; + } + stringBuffer.append(")"); + } else { + stringBuffer.append(keyField.getColumnName()); + } + isCommaNeededToApply = true; + } + stringBuffer.append(")"); + } + } + + stringBuffer.append(")"); + boolean isWithNeeded = true; + if (Boolean.parseBoolean(mapping.getProperty("compactStorage"))) { + stringBuffer.append(" WITH COMPACT STORAGE "); + isWithNeeded = false; + } + + String id = mapping.getProperty("id"); + if (id != null) { + if (isWithNeeded) { + stringBuffer.append(" WITH "); + } else { + stringBuffer.append(" AND "); + } + stringBuffer.append("ID = '").append(id).append("'"); + isWithNeeded = false; + } + if (cassandraKey != null) { + List<ClusterKeyField> clusterKeyFields = cassandraKey.getClusterKeyFields(); + if (clusterKeyFields != null) { + if (isWithNeeded) { + stringBuffer.append(" WITH "); + } else { + stringBuffer.append(" AND "); + } + stringBuffer.append(" CLUSTERING ORDER BY ("); + boolean isCommaNeededToApply = false; + for (ClusterKeyField keyField : clusterKeyFields) { + if (isCommaNeededToApply) { + stringBuffer.append(", "); + } + stringBuffer.append(keyField.getColumnName()).append(" "); + if (keyField.getOrder() != null) { + stringBuffer.append(keyField.getOrder()); + } + isCommaNeededToApply = true; + } + stringBuffer.append(")"); + } + } + return stringBuffer.toString(); + } + + private static void processFieldsForCreateTableQuery(List<Field> fields, boolean isCommaNeeded, StringBuilder stringBuilder) { + for (Field field : fields) { + if (isCommaNeeded) { + stringBuilder.append(", "); + } + stringBuilder.append(field.getColumnName()).append(" ").append(field.getType()); + boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static")); + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isStaticColumn) { + stringBuilder.append(" STATIC"); + } + if (isPrimaryKey) { + stringBuilder.append(" PRIMARY KEY "); + } + isCommaNeeded = true; + } + } + + /** + * This method returns the CQL query to drop table. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @return CQL query + */ + static String getDropTableQuery(CassandraMapping mapping) { + return "DROP TABLE IF EXISTS " + mapping.getKeySpace().getName() + "." + mapping.getCoreName(); + } + + /** + * This method returns the CQL query to drop key space. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_keyspace_r.html + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @return CQL query + */ + static String getDropKeySpaceQuery(CassandraMapping mapping) { + return "DROP KEYSPACE IF EXISTS " + mapping.getKeySpace().getName(); + } + + /** + * This method returns the CQL query to truncate (removes all the data) in the table. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/truncate_r.html + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @return CQL query + */ + static String getTruncateTableQuery(CassandraMapping mapping) { + return QueryBuilder.truncate(mapping.getKeySpace().getName(), mapping.getCoreName()).getQueryString(); + } + + /** + * This method return the CQL query to insert data in to the table. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/insert_r.html + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @param fields available fields + * @return CQL Query + */ + static String getInsertDataQuery(CassandraMapping mapping, List<String> fields) { + String[] columnNames = getColumnNames(mapping, fields); + String[] objects = new String[fields.size()]; + Arrays.fill(objects, "?"); + return QueryBuilder.insertInto(mapping.getKeySpace().getName(), mapping.getCoreName()).values(columnNames, objects).getQueryString(); + } + + /** + * This method return the CQL query to delete a persistent in the table. + * refer : http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlDelete.html + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @param fields filed list to be deleted + * @return CQL Query + */ + static String getDeleteDataQuery(CassandraMapping mapping, List<String> fields) { + String[] columnNames = getColumnNames(mapping, fields); + String[] objects = new String[fields.size()]; + Arrays.fill(objects, "?"); + Delete delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName()); + return processKeys(columnNames, delete); + } + + private static String processKeys(String[] columnNames, BuiltStatement delete) { + BuiltStatement query = null; + boolean isWhereNeeded = true; + for (String columnName : columnNames) { + if (isWhereNeeded) { + if (delete instanceof Delete) { + query = ((Delete) delete).where(QueryBuilder.eq(columnName, "?")); + } else { + query = ((Select) delete).where(QueryBuilder.eq(columnName, "?")); + } + isWhereNeeded = false; + } else { + if (delete instanceof Delete) { + query = ((Delete.Where) query).and(QueryBuilder.eq(columnName, "?")); + } else { + query = ((Select.Where) query).and(QueryBuilder.eq(columnName, "?")); + } + } + } + return query != null ? query.getQueryString() : null; + } + + /** + * This method returns the CQL Select query to retrieve data from the table. + * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @param keyFields key fields + * @return CQL Query + */ + static String getSelectObjectQuery(CassandraMapping mapping, List<String> keyFields) { + Select select = QueryBuilder.select().from(mapping.getKeySpace().getName(), mapping.getCoreName()); + if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { + select.allowFiltering(); + } + String[] columnNames = getColumnNames(mapping, keyFields); + return processKeys(columnNames, select); + } + + /** + * This method returns CQL Select query to retrieve data from the table with given fields. + * This method is used for Avro Serialization + * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @param fields Given fields to retrieve + * @param keyFields key fields + * @return CQL Query + */ + static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, String[] fields, List<String> keyFields) { + Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { + select.allowFiltering(); + } + String[] columnNames = getColumnNames(mapping, keyFields); + return processKeys(columnNames, select); + } + + /** + * This method returns CQL Select query to retrieve data from the table with given fields. + * This method is used for Native Serialization + * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @param fields Given fields to retrieve + * @return CQL Query + */ + static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, String[] fields) { + String cqlQuery = null; + String[] columnNames = getColumnNames(mapping, Arrays.asList(fields)); + Select select = QueryBuilder.select(columnNames).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { + select.allowFiltering(); + } + CassandraKey cKey = mapping.getCassandraKey(); + if (cKey != null) { + Select.Where query = null; + boolean isWhereNeeded = true; + for (Field field : cKey.getFieldList()) { + if (isWhereNeeded) { + query = select.where(QueryBuilder.eq(field.getColumnName(), "?")); + isWhereNeeded = false; + } else { + query = query.and(QueryBuilder.eq(field.getColumnName(), "?")); + } + } + cqlQuery = query != null ? query.getQueryString() : null; + } else { + for (Field field : mapping.getFieldList()) { + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isPrimaryKey) { + cqlQuery = select.where(QueryBuilder.eq(field.getColumnName(), "?")).getQueryString(); + break; + } + } + } + return cqlQuery; + } + + + /** + * This method returns CQL Query for execute method. This CQL contains a Select Query to retrieve data from the table + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @param cassandraQuery Query {@link CassandraQuery} + * @param objects object list + * @return CQL Query + */ + static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, String[] fields) { + long limit = cassandraQuery.getLimit(); + Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + if (limit > 0) { + select = select.limit((int) limit); + } + if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { + select.allowFiltering(); + } + return processQuery(cassandraQuery, select, mapping, objects); + } + + private static String processQuery(Query cassandraQuery, BuiltStatement select, CassandraMapping mapping, List<Object> objects) { + String primaryKey = null; + BuiltStatement query = null; + Object startKey = cassandraQuery.getStartKey(); + Object endKey = cassandraQuery.getEndKey(); + Object key = cassandraQuery.getKey(); + boolean isWhereNeeded = true; + if (key != null) { + if (mapping.getCassandraKey() != null) { + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.eq(columnKeys[i], "?")); + } else if (select instanceof Delete) { + query = ((Delete) select).where(QueryBuilder.eq(columnKeys[i], "?")); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.eq(columnKeys[i], "?")); + } + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + if (select instanceof Select) { + query = ((Select.Where) query).and(QueryBuilder.eq(columnKeys[i], "?")); + } else if (select instanceof Delete) { + query = ((Delete.Where) query).and(QueryBuilder.eq(columnKeys[i], "?")); + } else { + query = ((Update.Where) query).and(QueryBuilder.eq(columnKeys[i], "?")); + } + objects.add(cassandraValues.get(i)); + } + } + } else { + primaryKey = getPKey(mapping.getFieldList()); + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.eq(primaryKey, "?")); + } else if (select instanceof Delete) { + query = ((Delete) select).where(QueryBuilder.eq(primaryKey, "?")); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.eq(primaryKey, "?")); + } + objects.add(key); + } + } else { + if (startKey != null) { + if (mapping.getCassandraKey() != null) { + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.gte(columnKeys[i], "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.gte(columnKeys[i], "?")); + } + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + if (select instanceof Select) { + query = ((Select.Where) query).and(QueryBuilder.gte(columnKeys[i], "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Where) query).and(QueryBuilder.gte(columnKeys[i], "?")); + } + objects.add(cassandraValues.get(i)); + } + } + } else { + primaryKey = getPKey(mapping.getFieldList()); + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.gte(primaryKey, "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.gte(primaryKey, "?")); + } + objects.add(startKey); + isWhereNeeded = false; + } + } + if (endKey != null) { + if (mapping.getCassandraKey() != null) { + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues); + String[] columnKeys = getColumnNames(mapping, cassandraKeys); + for (int i = 0; i < cassandraKeys.size(); i++) { + if (isWhereNeeded) { + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.lte(columnKeys[i], "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.lte(columnKeys[i], "?")); + } + objects.add(cassandraValues.get(i)); + isWhereNeeded = false; + } else { + if (select instanceof Select) { + query = ((Select.Where) query).and(QueryBuilder.lte(columnKeys[i], "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Where) query).and(QueryBuilder.lte(columnKeys[i], "?")); + } + objects.add(cassandraValues.get(i)); + } + } + } else { + primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); + if (isWhereNeeded) { + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.lte(primaryKey, "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.lte(primaryKey, "?")); + } + } else { + if (select instanceof Select) { + query = ((Select.Where) query).and(QueryBuilder.lte(primaryKey, "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Where) query).and(QueryBuilder.lte(primaryKey, "?")); + } + } + objects.add(endKey); + } + } + } + if (startKey == null && endKey == null && key == null) { + return select.getQueryString(); + } + return query != null ? query.getQueryString() : null; + } + + private static String[] getColumnNames(CassandraMapping mapping, List<String> fields) { + ArrayList<String> columnNames = new ArrayList<>(); + for (String field : fields) { + Field fieldBean = mapping.getFieldFromFieldName(field); + CassandraKey cassandraKey = mapping.getCassandraKey(); + Field keyBean = null; + if (cassandraKey != null) { + keyBean = cassandraKey.getFieldFromFieldName(field); + } + if (fieldBean != null) { + columnNames.add(fieldBean.getColumnName()); + } else if (keyBean != null) { + columnNames.add(keyBean.getColumnName()); + } else { + LOG.warn("{} field is ignored, couldn't find relevant field in the persistent mapping", field); + } + } + return columnNames.toArray(new String[0]); + } + + private static String getPKey(List<Field> fields) { + for (Field field : fields) { + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isPrimaryKey) { + return field.getColumnName(); + } + } + return null; + } + + /** + * This method returns CQL Qeury for DeleteByQuery method. + * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlDelete.html + * + * @param mapping Cassandra Mapping {@link CassandraMapping} + * @param cassandraQuery Cassandra Query {@link CassandraQuery} + * @param objects field values + * @return CQL Query + */ + static String getDeleteByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { + String[] columns = null; + if (cassandraQuery.getFields() != null) { + columns = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); + } + Delete delete; + if (columns != null) { + delete = QueryBuilder.delete(columns).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + } else { + delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName()); + } + return processQuery(cassandraQuery, delete, mapping, objects); + } + + /** + * This method returns the CQL Query for UpdateByQuery method + * refer : http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlUpdate.html + * + * @param mapping Cassandra mapping {@link CassandraMapping} + * @param cassandraQuery Cassandra Query {@link CassandraQuery} + * @param objects field Objects list + * @return CQL Query + */ + static String getUpdateByQueryForAvro(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, Schema schema) { + Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName()); + Update.Assignments updateAssignments = null; + if (cassandraQuery instanceof CassandraQuery) { + String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); + for (String column : columnNames) { + updateAssignments = update.with(QueryBuilder.set(column, "?")); + Field field = mapping.getFieldFromColumnName(column); + Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field.getFieldName()); + try { + Schema schemaField = schema.getField(field.getFieldName()).schema(); + objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value, field)); + } catch (NullPointerException e) { + throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + "."); + } + } + } else { + throw new RuntimeException("Please use Cassandra Query object to invoke, UpdateByQuery method."); + } + return processQuery(cassandraQuery, updateAssignments, mapping, objects); + } + + + /** + * This method returns the CQL Query for UpdateByQuery method + * refer : http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlUpdate.html + * + * @param mapping Cassandra mapping {@link CassandraMapping} + * @param cassandraQuery Cassandra Query {@link CassandraQuery} + * @param objects field Objects list + * @return CQL Query + */ + static String getUpdateByQueryForNative(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { + Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName()); + Update.Assignments updateAssignments = null; + if (cassandraQuery instanceof CassandraQuery) { + String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); + for (String column : columnNames) { + updateAssignments = update.with(QueryBuilder.set(column, "?")); + objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName())); + } + } else { + throw new RuntimeException("Please use Cassandra Query object to invoke, UpdateByQuery method."); + } + return processQuery(cassandraQuery, updateAssignments, mapping, objects); + } + + + private static void populateFieldsToQuery(Schema schema, StringBuilder builder) throws Exception { + switch (schema.getType()) { + case INT: + builder.append("int"); + break; + case MAP: + builder.append("map<text,"); + populateFieldsToQuery(schema.getValueType(), builder); + builder.append(">"); + break; + case ARRAY: + builder.append("list<"); + populateFieldsToQuery(schema.getElementType(), builder); + builder.append(">"); + break; + case LONG: + builder.append("bigint"); + break; + case FLOAT: + builder.append("float"); + break; + case DOUBLE: + builder.append("double"); + break; + case BOOLEAN: + builder.append("boolean"); + break; + case BYTES: + builder.append("blob"); + break; + case RECORD: + builder.append("frozen<").append(schema.getName()).append(">"); + break; + case STRING: + case FIXED: + case ENUM: + builder.append("text"); + break; + case UNION: + for (Schema unionElementSchema : schema.getTypes()) { + if (unionElementSchema.getType().equals(Schema.Type.RECORD)) { + String recordName = unionElementSchema.getName(); + if (!builder.toString().contains(recordName)) { + builder.append("frozen<").append(recordName).append(">"); + } else { + LOG.warn("Same Field Type can't be mapped recursively. This is not supported with Cassandra UDT types, Please use byte dataType for recursive mapping."); + throw new Exception("Same Field Type has mapped recursively"); + } + break; + } else if (!unionElementSchema.getType().equals(Schema.Type.NULL)) { + populateFieldsToQuery(unionElementSchema, builder); + break; + } + } + break; + } + } + + static void processRecord(Schema recordSchema, StringBuilder stringBuilder) { + boolean isCommaNeeded = false; + for (Schema.Field field : recordSchema.getFields()) { + if (isCommaNeeded) { + stringBuilder.append(", "); + } + String fieldName = field.name(); + stringBuilder.append(fieldName).append(" "); + try { + populateFieldsToQuery(field.schema(), stringBuilder); + isCommaNeeded = true; + } catch (Exception e) { + int i = stringBuilder.indexOf(fieldName); + if (i != -1) { + stringBuilder.delete(i, i + fieldName.length()); + isCommaNeeded = false; + } + } + } + } + + static String getCreateUDTTypeForNative(CassandraMapping mapping, Class persistentClass, String udtType, String fieldName) throws NoSuchFieldException { + StringBuilder stringBuffer = new StringBuilder(); + Class udtClass = persistentClass.getDeclaredField(fieldName).getType(); + UDT annotation = (UDT) udtClass.getAnnotation(UDT.class); + if (annotation != null) { + stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" ("); + boolean isCommaNeeded = false; + for (java.lang.reflect.Field udtField : udtClass.getDeclaredFields()) { + com.datastax.driver.mapping.annotations.Field fieldAnnotation = udtField.getDeclaredAnnotation(com.datastax.driver.mapping.annotations.Field.class); + if (fieldAnnotation != null) { + if (isCommaNeeded) { + stringBuffer.append(", "); + } + if (!fieldAnnotation.name().isEmpty()) { + stringBuffer.append(fieldAnnotation.name()).append(" "); + } else { + stringBuffer.append(udtField.getName()).append(" "); + } + stringBuffer.append(dataType(udtField, null)); + isCommaNeeded = true; + } + } + stringBuffer.append(")"); + } else { + throw new RuntimeException(""); + } + return stringBuffer.toString(); + } + + static String getCreateUDTTypeForAvro(CassandraMapping mapping, String udtType, Schema fieldSchema) { + StringBuilder stringBuffer = new StringBuilder(); + stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" ("); + CassandraQueryFactory.processRecord(fieldSchema, stringBuffer); + stringBuffer.append(")"); + return stringBuffer.toString(); + } + + private static String dataType(java.lang.reflect.Field field, Type fieldType) { + String type; + if (field != null) { + type = field.getType().getName(); + } else { + type = fieldType.getTypeName(); + } + String dataType; + String s = type; + if (s.equals("java.lang.String") || s.equals("java.lang.CharSequence")) { + dataType = "text"; + } else if (s.equals("int") || s.equals("java.lang.Integer")) { + dataType = "int"; + } else if (s.equals("double") || s.equals("java.lang.Double")) { + dataType = "double"; + } else if (s.equals("float") || s.equals("java.lang.Float")) { + dataType = "float"; + } else if (s.equals("boolean") || s.equals("java.lang.Boolean")) { + dataType = "boolean"; + } else if (s.equals("java.util.UUID")) { + dataType = "uuid"; + } else if (s.equals("java.lang.Long")) { + dataType = "bigint"; + } else if (s.equals("java.math.BigDecimal")) { + dataType = "decimal"; + } else if (s.equals("java.net.InetAddress")) { + dataType = "inet"; + } else if (s.equals("java.math.BigInteger")) { + dataType = "varint"; + } else if (s.equals("java.nio.ByteBuffer")) { + dataType = "blob"; + } else if (s.contains("Map")) { + ParameterizedType mapType; + if (field != null) { + mapType = (ParameterizedType) field.getGenericType(); + } else { + mapType = (ParameterizedType) fieldType; + } + Type value1 = mapType.getActualTypeArguments()[0]; + Type value2 = mapType.getActualTypeArguments()[1]; + dataType = "map<" + dataType(null, value1) + "," + dataType(null, value2) + ">"; + } else if (s.contains("List")) { + ParameterizedType listType; + if (field != null) { + listType = (ParameterizedType) field.getGenericType(); + } else { + listType = (ParameterizedType) fieldType; + } + Type value = listType.getActualTypeArguments()[0]; + dataType = "list<" + dataType(null, value) + ">"; + } else if (s.contains("Set")) { + ParameterizedType setType; + if (field != null) { + setType = (ParameterizedType) field.getGenericType(); + } else { + setType = (ParameterizedType) fieldType; + } + Type value = setType.getActualTypeArguments()[0]; + dataType = "set<" + dataType(null, value) + ">"; + } else { + throw new RuntimeException("Unsupported Cassandra DataType"); + } + return dataType; + } + +}
http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java new file mode 100644 index 0000000..208428e --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.serializers; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.TableMetadata; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.store.CassandraClient; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.cassandra.store.CassandraStore; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.DataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * This is the abstract Cassandra Serializer class. + */ +public abstract class CassandraSerializer<K, T extends Persistent> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class); + + protected Class<K> keyClass; + + protected Class<T> persistentClass; + + protected CassandraMapping mapping; + + protected CassandraClient client; + + protected String readConsistencyLevel; + + protected String writeConsistencyLevel; + + protected Map<String, String> userDefineTypeMaps; + + CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) { + this.keyClass = keyClass; + this.persistentClass = persistantClass; + this.client = cc; + this.mapping = mapping; + this.readConsistencyLevel = client.getReadConsistencyLevel(); + this.writeConsistencyLevel = client.getWriteConsistencyLevel(); + } + + /** + * This method returns the Cassandra Serializer according the Cassandra serializer property. + * + * @param cc Cassandra Client + * @param type Serialization type + * @param dataStore Cassandra DataStore + * @param mapping Cassandra Mapping + * @param <K> key class + * @param <T> persistent class + * @return Serializer + */ + public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) { + CassandraStore.SerializerType serType = type == null || type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH)); + CassandraSerializer serializer; + switch (serType) { + case AVRO: + serializer = new AvroSerializer(cc, dataStore, mapping); + break; + case NATIVE: + default: + serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping); + } + return serializer; + } + + /** + * In this method persistent class been analyzed to find inner records with UDT type, this method should call in every Cassandra serialization Constructor. + * + * @throws Exception + */ + protected abstract void analyzePersistent() throws Exception; + + + public void createSchema() { + LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName()); + this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping)); + for (Map.Entry udtType : userDefineTypeMaps.entrySet()) { + LOG.debug("creating Cassandra User Define Type {}", udtType.getKey()); + this.client.getSession().execute((String) udtType.getValue()); + } + LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName()); + this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping)); + } + + public void deleteSchema() { + LOG.debug("dropping Cassandra table {}", mapping.getCoreName()); + this.client.getSession().execute(CassandraQueryFactory.getDropTableQuery(mapping)); + LOG.debug("dropping Cassandra keyspace {}", mapping.getKeySpace().getName()); + this.client.getSession().execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping)); + } + + public void close() { + this.client.close(); + } + + public void truncateSchema() { + LOG.debug("truncating Cassandra table {}", mapping.getCoreName()); + this.client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); + } + + public boolean schemaExists() { + KeyspaceMetadata keyspace = this.client.getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()); + if (keyspace != null) { + TableMetadata table = keyspace.getTable(mapping.getCoreName()); + return table != null; + } else { + return false; + } + } + + protected String[] getFields() { + List<String> fields = new ArrayList<>(); + for (Field field : mapping.getFieldList()) { + fields.add(field.getFieldName()); + } + return fields.toArray(new String[0]); + } + + /** + * Inserts the persistent Object + * + * @param key key value + * @param value persistent value + */ + public abstract void put(K key, T value); + + /** + * Retrieves the persistent value according to the key + * + * @param key key value + * @return persistent value + */ + public abstract T get(K key); + + /** + * Deletes persistent value according to the key + * + * @param key key value + * @return isDeleted + */ + public abstract boolean delete(K key); + + /** + * Retrieves the persistent value according to the key and fields + * + * @param key key value + * @param fields fields + * @return persistent value + */ + public abstract T get(K key, String[] fields); + + /** + * Executes the given query and returns the results. + * + * @param dataStore Cassandra data store + * @param query Cassandra Query + * @return Cassandra Result + */ + public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query); + + /** + * Update the persistent objects + * + * @param query Cassandra Query + * @return isUpdated + */ + public abstract boolean updateByQuery(Query query); + + public long deleteByQuery(Query query) { + List<Object> objectArrayList = new ArrayList<>(); + if (query.getKey() == null && query.getEndKey() == null && query.getStartKey() == null) { + if (query.getFields() == null) { + client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); + } else { + LOG.error("Delete by Query is not supported for the Queries which didn't specify Query keys with fields."); + } + } else { + String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList); + ResultSet results; + SimpleStatement statement; + if (objectArrayList.size() == 0) { + statement = new SimpleStatement(cqlQuery); + } else { + statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); + } + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + results = client.getSession().execute(statement); + LOG.debug("Delete by Query was applied : " + results.wasApplied()); + } + LOG.info("Delete By Query method doesn't return the deleted element count."); + return 0; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java new file mode 100644 index 0000000..bf28ee0 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.serializers; + +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.Result; +import org.apache.commons.lang.ArrayUtils; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.query.CassandraResultSet; +import org.apache.gora.cassandra.store.CassandraClient; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; +import org.apache.gora.store.DataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + +/** + * This Class contains the operation relates to Native Serialization. + */ +class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { + + private static final Logger LOG = LoggerFactory.getLogger(NativeSerializer.class); + + private Mapper<T> mapper; + + NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { + super(cassandraClient, keyClass, persistentClass, mapping); + try { + analyzePersistent(); + } catch (Exception e) { + throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage()); + } + this.createSchema(); + MappingManager mappingManager = new MappingManager(cassandraClient.getSession()); + mapper = mappingManager.mapper(persistentClass); + if (cassandraClient.getWriteConsistencyLevel() != null) { + mapper.setDefaultDeleteOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getWriteConsistencyLevel()))); + mapper.setDefaultSaveOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getWriteConsistencyLevel()))); + } + if (cassandraClient.getReadConsistencyLevel() != null) { + mapper.setDefaultGetOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getReadConsistencyLevel()))); + } + } + + /** + * {@inheritDoc} + * + * @param key + * @param value + */ + @Override + public void put(Object key, Persistent value) { + LOG.debug("Object is saved with key : {} and value : {}", key, value); + mapper.save((T) value); + } + + /** + * {@inheritDoc} + * + * @param key + * @return + */ + @Override + public T get(Object key) { + T object = mapper.get(key); + if (object != null) { + LOG.debug("Object is found for key : {}", key); + } else { + LOG.debug("Object is not found for key : {}", key); + } + return object; + } + + /** + * {@inheritDoc} + * + * @param key + * @return + */ + @Override + public boolean delete(Object key) { + LOG.debug("Object is deleted for key : {}", key); + mapper.delete(key); + return true; + } + + /** + * {@inheritDoc} + * + * @param key + * @param fields + * @return + */ + @Override + public Persistent get(Object key, String[] fields) { + if (fields == null) { + fields = getFields(); + } + String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields); + SimpleStatement statement = new SimpleStatement(cqlQuery, key); + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); + } + ResultSet results = client.getSession().execute(statement); + Result<T> objects = mapper.map(results); + List<T> objectList = objects.all(); + if (objectList != null) { + LOG.debug("Object is found for key : {}", key); + return objectList.get(0); + } + LOG.debug("Object is not found for key : {}", key); + return null; + } + + /** + * {@inheritDoc} + * + * @throws Exception + */ + @Override + protected void analyzePersistent() throws Exception { + userDefineTypeMaps = new HashMap<>(); + for (Field field : mapping.getFieldList()) { + String fieldType = field.getType(); + if (fieldType.contains("frozen")) { + String udtType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">")); + String createQuery = CassandraQueryFactory.getCreateUDTTypeForNative(mapping, persistentClass, udtType, field.getFieldName()); + userDefineTypeMaps.put(udtType, createQuery); + } + } + } + + /** + * {@inheritDoc} + * + * @param query + * @return + */ + @Override + public boolean updateByQuery(Query query) { + List<Object> objectArrayList = new ArrayList<>(); + String cqlQuery = CassandraQueryFactory.getUpdateByQueryForNative(mapping, query, objectArrayList); + ResultSet results; + SimpleStatement statement; + if (objectArrayList.size() == 0) { + statement = new SimpleStatement(cqlQuery); + } else { + statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); + } + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + results = client.getSession().execute(statement); + return results.wasApplied(); + } + + /** + * {@inheritDoc} + * + * @param dataStore + * @param query + * @return + */ + @Override + public org.apache.gora.query.Result execute(DataStore dataStore, Query query) { + List<Object> objectArrayList = new ArrayList<>(); + String[] fields = query.getFields(); + if (fields != null) { + fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys()); + } else { + fields = mapping.getAllFieldsIncludingKeys(); + } + CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); + String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields); + ResultSet results; + if (objectArrayList.size() == 0) { + results = client.getSession().execute(cqlQuery); + } else { + results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + } + Result<T> objects = mapper.map(results); + Iterator iterator = objects.iterator(); + while (iterator.hasNext()) { + T result = (T) iterator.next(); + K key = getKey(result); + cassandraResult.addResultElement(key, result); + } + return cassandraResult; + } + + private K getKey(T object) { + String keyField = null; + for (Field field : mapping.getFieldList()) { + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isPrimaryKey) { + keyField = field.getFieldName(); + break; + } + } + K key; + Method keyMethod = null; + try { + for (Method method : this.persistentClass.getMethods()) { + if (method.getName().equalsIgnoreCase("get" + keyField)) { + keyMethod = method; + } + } + key = (K) keyMethod.invoke(object); + } catch (Exception e) { + try { + key = (K) this.persistentClass.getField(keyField).get(object); + } catch (Exception e1) { + throw new RuntimeException("Field" + keyField + " is not accessible in " + persistentClass + " : " + e1.getMessage()); + } + } + return key; + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java new file mode 100644 index 0000000..ce1e3e7 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains Cassandra store related util classes for serializer. + */ +package org.apache.gora.cassandra.serializers; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java new file mode 100644 index 0000000..f672884 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.store; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.ProtocolOptions; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.TypeCodec; +import com.datastax.driver.core.policies.ConstantReconnectionPolicy; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.policies.LatencyAwarePolicy; +import com.datastax.driver.core.policies.LoggingRetryPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; +import com.datastax.driver.extras.codecs.arrays.DoubleArrayCodec; +import com.datastax.driver.extras.codecs.arrays.FloatArrayCodec; +import com.datastax.driver.extras.codecs.arrays.IntArrayCodec; +import com.datastax.driver.extras.codecs.arrays.LongArrayCodec; +import com.datastax.driver.extras.codecs.arrays.ObjectArrayCodec; +import com.datastax.driver.extras.codecs.date.SimpleDateCodec; +import com.datastax.driver.extras.codecs.date.SimpleTimestampCodec; +import com.datastax.driver.extras.codecs.jdk8.OptionalCodec; +import org.apache.gora.cassandra.bean.Field; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.JDOMException; +import org.jdom.input.SAXBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Properties; + +/** + * This class provides the Cassandra Client Connection. + * Initialize the Cassandra Connection according to the Properties. + */ +public class CassandraClient { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class); + + private Cluster cluster; + + private Session session; + + private CassandraMapping mapping; + + private String readConsistencyLevel; + + private String writeConsistencyLevel; + + public Session getSession() { + return session; + } + + public Cluster getCluster() { + return cluster; + } + + void initialize(Properties properties, CassandraMapping mapping) throws Exception { + Cluster.Builder builder = Cluster.builder(); + List<String> codecs = readCustomCodec(properties); + builder = populateSettings(builder, properties); + this.mapping = mapping; + this.cluster = builder.build(); + if (codecs != null) { + registerCustomCodecs(codecs); + } + readConsistencyLevel = properties.getProperty(CassandraStoreParameters.READ_CONSISTENCY_LEVEL); + writeConsistencyLevel = properties.getProperty(CassandraStoreParameters.WRITE_CONSISTENCY_LEVEL); + registerOptionalCodecs(); + this.session = this.cluster.connect(); + } + + private void registerOptionalCodecs() { + // Optional Codecs for natives + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.ascii())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.bigint())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.blob())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cboolean())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cdouble())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cfloat())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cint())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.counter())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.date())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.decimal())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.inet())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.smallInt())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.time())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.timestamp())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.timeUUID())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.tinyInt())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.varint())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.varchar())); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.uuid())); + // Optional Array Codecs + this.cluster.getConfiguration().getCodecRegistry().register(new IntArrayCodec()); + this.cluster.getConfiguration().getCodecRegistry().register(new DoubleArrayCodec()); + this.cluster.getConfiguration().getCodecRegistry().register(new FloatArrayCodec()); + this.cluster.getConfiguration().getCodecRegistry().register(new LongArrayCodec()); + this.cluster.getConfiguration().getCodecRegistry().register(new ObjectArrayCodec<>( + DataType.list(DataType.varchar()), + String[].class, + TypeCodec.varchar())); + // Optional Time Codecs + this.cluster.getConfiguration().getCodecRegistry().register(new SimpleDateCodec()); + this.cluster.getConfiguration().getCodecRegistry().register(new SimpleTimestampCodec()); + + for (Field field : this.mapping.getFieldList()) { + String columnType = field.getType().toLowerCase(Locale.ENGLISH); + //http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cql_data_types_c.html + if (columnType.contains("list")) { + columnType = columnType.substring(columnType.indexOf("<") + 1, columnType.indexOf(">")); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.list(getTypeCodec(columnType)))); + } else if (columnType.contains("set")) { + columnType = columnType.substring(columnType.indexOf("<") + 1, columnType.indexOf(">")); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.set(getTypeCodec(columnType)))); + } else if (columnType.contains("map")) { + String[] columnTypes = columnType.substring(columnType.indexOf("<") + 1, columnType.indexOf(">")).split(","); + this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.map(TypeCodec.set(getTypeCodec(columnTypes[0])), TypeCodec.set(getTypeCodec(columnTypes[1]))))); + } + } + } + + private TypeCodec getTypeCodec(String columnType) { + TypeCodec typeCodec; + switch (columnType) { + case "ascii": + typeCodec = TypeCodec.ascii(); + break; + case "bigint": + typeCodec = TypeCodec.bigint(); + break; + case "blob": + typeCodec = TypeCodec.blob(); + break; + case "boolean": + typeCodec = TypeCodec.cboolean(); + break; + case "counter": + typeCodec = TypeCodec.counter(); + break; + case "date": + typeCodec = TypeCodec.date(); + break; + case "decimal": + typeCodec = TypeCodec.decimal(); + break; + case "double": + typeCodec = TypeCodec.cdouble(); + break; + case "float": + typeCodec = TypeCodec.cfloat(); + break; + case "inet": + typeCodec = TypeCodec.inet(); + break; + case "int": + typeCodec = TypeCodec.cint(); + break; + case "smallint": + typeCodec = TypeCodec.smallInt(); + break; + case "time": + typeCodec = TypeCodec.time(); + break; + case "timestamp": + typeCodec = TypeCodec.timestamp(); + break; + case "timeuuid": + typeCodec = TypeCodec.timeUUID(); + break; + case "tinyint": + typeCodec = TypeCodec.tinyInt(); + break; + case "uuid": + typeCodec = TypeCodec.uuid(); + break; + case "varint": + typeCodec = TypeCodec.varint(); + break; + case "varchar": + case "text": + typeCodec = TypeCodec.varchar(); + break; + default: + LOG.error("Unsupported Cassandra datatype: {} ", columnType); + throw new RuntimeException("Unsupported Cassandra datatype: " + columnType); + } + return typeCodec; + } + + private Cluster.Builder populateSettings(Cluster.Builder builder, Properties properties) { + String serversParam = properties.getProperty(CassandraStoreParameters.CASSANDRA_SERVERS); + String[] servers = serversParam.split(","); + for (String server : servers) { + builder = builder.addContactPoint(server); + } + String portProp = properties.getProperty(CassandraStoreParameters.PORT); + if (portProp != null) { + builder = builder.withPort(Integer.parseInt(portProp)); + } + String clusterNameProp = properties.getProperty(CassandraStoreParameters.CLUSTER_NAME); + if (clusterNameProp != null) { + builder = builder.withClusterName(clusterNameProp); + } + String compressionProp = properties.getProperty(CassandraStoreParameters.COMPRESSION); + if (compressionProp != null) { + builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compressionProp)); + } + builder = this.populateCredentials(properties, builder); + builder = this.populateLoadBalancingProp(properties, builder); + String enableJMXProp = properties.getProperty(CassandraStoreParameters.ENABLE_JMX_REPORTING); + if (!Boolean.parseBoolean(enableJMXProp)) { + builder = builder.withoutJMXReporting(); + } + String enableMetricsProp = properties.getProperty(CassandraStoreParameters.ENABLE_METRICS); + if (!Boolean.parseBoolean(enableMetricsProp)) { + builder = builder.withoutMetrics(); + } + builder = this.populatePoolingSettings(properties, builder); + String versionProp = properties.getProperty(CassandraStoreParameters.PROTOCOL_VERSION); + if (versionProp != null) { + builder = builder.withProtocolVersion(ProtocolVersion.fromInt(Integer.parseInt(versionProp))); + } + builder = this.populateQueryOptions(properties, builder); + builder = this.populateReconnectPolicy(properties, builder); + builder = this.populateRetrytPolicy(properties, builder); + builder = this.populateSocketOptions(properties, builder); + String enableSSLProp = properties.getProperty(CassandraStoreParameters.ENABLE_SSL); + if (enableSSLProp != null) { + if (Boolean.parseBoolean(enableSSLProp)) { + builder = builder.withSSL(); + } + } + return builder; + } + + private Cluster.Builder populateLoadBalancingProp(Properties properties, Cluster.Builder builder) { + String loadBalancingProp = properties.getProperty(CassandraStoreParameters.LOAD_BALANCING_POLICY); + if (loadBalancingProp != null) { + switch (loadBalancingProp) { + case "LatencyAwareRoundRobinPolicy": + builder = builder.withLoadBalancingPolicy(LatencyAwarePolicy.builder(new RoundRobinPolicy()).build()); + break; + case "RoundRobinPolicy": + builder = builder.withLoadBalancingPolicy(new RoundRobinPolicy()); + break; + case "DCAwareRoundRobinPolicy": { + String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER); + boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean( + properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL)); + if (dataCenter != null && !dataCenter.isEmpty()) { + if (allowRemoteDCsForLocalConsistencyLevel) { + builder = builder.withLoadBalancingPolicy( + DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter) + .allowRemoteDCsForLocalConsistencyLevel().build()); + } else { + builder = builder.withLoadBalancingPolicy( + DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build()); + } + } else { + if (allowRemoteDCsForLocalConsistencyLevel) { + builder = builder.withLoadBalancingPolicy( + (DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build())); + } else { + builder = builder.withLoadBalancingPolicy((DCAwareRoundRobinPolicy.builder().build())); + } + } + break; + } + case "TokenAwareRoundRobinPolicy": + builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())); + break; + case "TokenAwareDCAwareRoundRobinPolicy": { + String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER); + boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean( + properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL)); + if (dataCenter != null && !dataCenter.isEmpty()) { + if (allowRemoteDCsForLocalConsistencyLevel) { + builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( + DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter) + .allowRemoteDCsForLocalConsistencyLevel().build())); + } else { + builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( + DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build())); + } + } else { + if (allowRemoteDCsForLocalConsistencyLevel) { + builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( + DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build())); + } else { + builder = builder.withLoadBalancingPolicy( + new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())); + } + } + break; + } + default: + LOG.error("Unsupported Cassandra load balancing policy: {} ", loadBalancingProp); + break; + } + } + return builder; + } + + private Cluster.Builder populateCredentials(Properties properties, Cluster.Builder builder) { + String usernameProp = properties.getProperty(CassandraStoreParameters.USERNAME); + String passwordProp = properties.getProperty(CassandraStoreParameters.PASSWORD); + if (usernameProp != null) { + builder = builder.withCredentials(usernameProp, passwordProp); + } + return builder; + } + + private Cluster.Builder populatePoolingSettings(Properties properties, Cluster.Builder builder) { + String localCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_CORE_CONNECTIONS_PER_HOST); + String remoteCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_CORE_CONNECTIONS_PER_HOST); + String localMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_CONNECTIONS_PER_HOST); + String remoteMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_CONNECTIONS_PER_HOST); + String localNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.LOCAL_NEW_CONNECTION_THRESHOLD); + String remoteNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.REMOTE_NEW_CONNECTION_THRESHOLD); + String localMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_REQUESTS_PER_CONNECTION); + String remoteMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_REQUESTS_PER_CONNECTION); + PoolingOptions options = new PoolingOptions(); + if (localCoreConnectionsPerHost != null) { + options.setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localCoreConnectionsPerHost)); + } + if (remoteCoreConnectionsPerHost != null) { + options.setCoreConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteCoreConnectionsPerHost)); + } + if (localMaxConnectionsPerHost != null) { + options.setMaxConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localMaxConnectionsPerHost)); + } + if (remoteMaxConnectionsPerHost != null) { + options.setMaxConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteMaxConnectionsPerHost)); + } + if (localNewConnectionThreshold != null) { + options.setNewConnectionThreshold(HostDistance.LOCAL, Integer.parseInt(localNewConnectionThreshold)); + } + if (remoteNewConnectionThreshold != null) { + options.setNewConnectionThreshold(HostDistance.REMOTE, Integer.parseInt(remoteNewConnectionThreshold)); + } + if (localMaxRequestsPerConnection != null) { + options.setMaxRequestsPerConnection(HostDistance.LOCAL, Integer.parseInt(localMaxRequestsPerConnection)); + } + if (remoteMaxRequestsPerConnection != null) { + options.setMaxRequestsPerConnection(HostDistance.REMOTE, Integer.parseInt(remoteMaxRequestsPerConnection)); + } + builder = builder.withPoolingOptions(options); + return builder; + } + + private Cluster.Builder populateQueryOptions(Properties properties, Cluster.Builder builder) { + String consistencyLevelProp = properties.getProperty(CassandraStoreParameters.CONSISTENCY_LEVEL); + String serialConsistencyLevelProp = properties.getProperty(CassandraStoreParameters.SERIAL_CONSISTENCY_LEVEL); + String fetchSize = properties.getProperty(CassandraStoreParameters.FETCH_SIZE); + QueryOptions options = new QueryOptions(); + if (consistencyLevelProp != null) { + options.setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevelProp)); + } + if (serialConsistencyLevelProp != null) { + options.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevelProp)); + } + if (fetchSize != null) { + options.setFetchSize(Integer.parseInt(fetchSize)); + } + return builder.withQueryOptions(options); + } + + private Cluster.Builder populateReconnectPolicy(Properties properties, Cluster.Builder builder) { + String reconnectionPolicy = properties.getProperty(CassandraStoreParameters.RECONNECTION_POLICY); + if (reconnectionPolicy != null) { + switch (reconnectionPolicy) { + case "ConstantReconnectionPolicy": { + String constantReconnectionPolicyDelay = properties.getProperty(CassandraStoreParameters.CONSTANT_RECONNECTION_POLICY_DELAY); + ConstantReconnectionPolicy policy = new ConstantReconnectionPolicy(Long.parseLong(constantReconnectionPolicyDelay)); + builder = builder.withReconnectionPolicy(policy); + break; + } + case "ExponentialReconnectionPolicy": { + String exponentialReconnectionPolicyBaseDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_BASE_DELAY); + String exponentialReconnectionPolicyMaxDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_MAX_DELAY); + + ExponentialReconnectionPolicy policy = new ExponentialReconnectionPolicy(Long.parseLong(exponentialReconnectionPolicyBaseDelay), + Long.parseLong(exponentialReconnectionPolicyMaxDelay)); + builder = builder.withReconnectionPolicy(policy); + break; + } + default: + LOG.error("Unsupported reconnection policy : {} ", reconnectionPolicy); + } + } + return builder; + } + + private Cluster.Builder populateRetrytPolicy(Properties properties, Cluster.Builder builder) { + String retryPolicy = properties.getProperty(CassandraStoreParameters.RETRY_POLICY); + if (retryPolicy != null) { + switch (retryPolicy) { + case "DefaultRetryPolicy": + builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); + break; + case "DowngradingConsistencyRetryPolicy": + builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); + break; + case "FallthroughRetryPolicy": + builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE); + break; + case "LoggingDefaultRetryPolicy": + builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); + break; + case "LoggingDowngradingConsistencyRetryPolicy": + builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); + break; + case "LoggingFallthroughRetryPolicy": + builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE)); + break; + default: + LOG.error("Unsupported retry policy : {} ", retryPolicy); + break; + } + } + return builder; + } + + private Cluster.Builder populateSocketOptions(Properties properties, Cluster.Builder builder) { + String connectionTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.CONNECTION_TIMEOUT_MILLIS); + String keepAliveProp = properties.getProperty(CassandraStoreParameters.KEEP_ALIVE); + String readTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.READ_TIMEOUT_MILLIS); + String receiveBufferSizeProp = properties.getProperty(CassandraStoreParameters.RECEIVER_BUFFER_SIZE); + String reuseAddress = properties.getProperty(CassandraStoreParameters.REUSE_ADDRESS); + String sendBufferSize = properties.getProperty(CassandraStoreParameters.SEND_BUFFER_SIZE); + String soLinger = properties.getProperty(CassandraStoreParameters.SO_LINGER); + String tcpNoDelay = properties.getProperty(CassandraStoreParameters.TCP_NODELAY); + SocketOptions options = new SocketOptions(); + if (connectionTimeoutMillisProp != null) { + options.setConnectTimeoutMillis(Integer.parseInt(connectionTimeoutMillisProp)); + } + if (keepAliveProp != null) { + options.setKeepAlive(Boolean.parseBoolean(keepAliveProp)); + } + if (readTimeoutMillisProp != null) { + options.setReadTimeoutMillis(Integer.parseInt(readTimeoutMillisProp)); + } + if (receiveBufferSizeProp != null) { + options.setReceiveBufferSize(Integer.parseInt(receiveBufferSizeProp)); + } + if (reuseAddress != null) { + options.setReuseAddress(Boolean.parseBoolean(reuseAddress)); + } + if (sendBufferSize != null) { + options.setSendBufferSize(Integer.parseInt(sendBufferSize)); + } + if (soLinger != null) { + options.setSoLinger(Integer.parseInt(soLinger)); + } + if (tcpNoDelay != null) { + options.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelay)); + } + return builder.withSocketOptions(options); + } + + private List<String> readCustomCodec(Properties properties) throws JDOMException, IOException { + String filename = properties.getProperty(CassandraStoreParameters.CUSTOM_CODEC_FILE); + if (filename != null) { + List<String> codecs = new ArrayList<>(); + SAXBuilder builder = new SAXBuilder(); + Document doc = builder.build(getClass().getClassLoader().getResourceAsStream(filename)); + List<Element> codecElementList = doc.getRootElement().getChildren("codec"); + for (Element codec : codecElementList) { + codecs.add(codec.getValue()); + } + return codecs; + } + return null; + } + + /** + * This method returns configured read consistency level. + * @return read Consistency level + */ + public String getReadConsistencyLevel() { + return readConsistencyLevel; + } + + /** + * This method returns configured write consistency level. + * @return write Consistency level + */ + public String getWriteConsistencyLevel() { + return writeConsistencyLevel; + } + + + public void close() { + this.session.close(); + this.cluster.close(); + } + + private void registerCustomCodecs(List<String> codecs) throws Exception { + for (String codec : codecs) { + this.cluster.getConfiguration().getCodecRegistry().register((TypeCodec<?>) Class.forName(codec).newInstance()); + } + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java new file mode 100644 index 0000000..7d5ac05 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.store; + +import org.apache.gora.cassandra.bean.CassandraKey; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.bean.KeySpace; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class represents the Cassandra Mapping. + */ +public class CassandraMapping { + + private static final String PRIMARY_KEY = "primarykey"; + private CassandraKey cassandraKey; + private Map<String, String> tableProperties; + private Class keyClass; + private Class persistentClass; + private KeySpace keySpace; + private List<Field> fieldList; + private Field inlinedDefinedPartitionKey; + private String coreName; + + /** + * Constructor of the class + */ + CassandraMapping() { + this.fieldList = new ArrayList<>(); + this.tableProperties = new HashMap<>(); + } + + /** + * This method returns the KeySpace in the mapping file, + * + * @return Key space {@link KeySpace} + */ + public KeySpace getKeySpace() { + return keySpace; + } + + /** + * This method set the KeySpace in the Cassandra mapping. + * + * @param keySpace Key space {@link KeySpace} + */ + void setKeySpace(KeySpace keySpace) { + this.keySpace = keySpace; + } + + /** + * Thi method returns only the fields which belongs only for the Persistent Object. + * + * @return List of Fields + */ + public List<Field> getFieldList() { + return fieldList; + } + + /** + * This method returns the Persistent Object's Field from the mapping, according to the FieldName. + * + * @param fieldName Field Name + * @return Field {@link Field} + */ + public Field getFieldFromFieldName(String fieldName) { + for (Field field1 : fieldList) { + if (field1.getFieldName().equalsIgnoreCase(fieldName)) { + return field1; + } + } + return null; + } + + /** + * This method returns the Persistent Object's Field from the mapping, according to the ColumnName. + * + * @param columnName Column Name + * @return Field {@link Field} + */ + public Field getFieldFromColumnName(String columnName) { + for (Field field1 : fieldList) { + if (field1.getColumnName().equalsIgnoreCase(columnName)) { + return field1; + } + } + return null; + } + + /** + * This method returns the Field Names + * + * @return array of Field Names + */ + public String[] getFieldNames() { + List<String> fieldNames = new ArrayList<>(fieldList.size()); + for (Field field : fieldList) { + fieldNames.add(field.getFieldName()); + } + String[] fieldNameArray = new String[fieldNames.size()]; + return fieldNames.toArray(fieldNameArray); + } + + /** + * This method returns partition keys + * + * @return partitionKeys + */ + public String[] getAllFieldsIncludingKeys() { + List<String> fieldNames = new ArrayList<>(fieldList.size()); + for (Field field : fieldList) { + fieldNames.add(field.getFieldName()); + } + if (cassandraKey != null) { + for (Field field : cassandraKey.getFieldList()) { + fieldNames.add(field.getFieldName()); + } + } + String[] fieldNameArray = new String[fieldNames.size()]; + return fieldNames.toArray(fieldNameArray); + } + + /** + * This method return all the fields which involves with partition keys, Including composite Keys + * @return field Names + */ + public String[] getAllKeys() { + List<String> fieldNames = new ArrayList<>(); + Field keyField = getInlinedDefinedPartitionKey(); + if (cassandraKey != null) { + for (Field field : cassandraKey.getFieldList()) { + fieldNames.add(field.getFieldName()); + } + } else { + fieldNames.add(keyField.getFieldName()); + } + String[] fieldNameArray = new String[fieldNames.size()]; + return fieldNames.toArray(fieldNameArray); + } + + public CassandraKey getCassandraKey() { + return cassandraKey; + } + + void setCassandraKey(CassandraKey cassandraKey) { + this.cassandraKey = cassandraKey; + } + + public String getCoreName() { + return coreName; + } + + void setCoreName(String coreName) { + this.coreName = coreName; + } + + void addCassandraField(Field field) { + this.fieldList.add(field); + } + + void addProperty(String key, String value) { + this.tableProperties.put(key, value); + } + + public String getProperty(String key) { + return this.tableProperties.get(key); + } + + private Field getDefaultCassandraKey() { + Field field = new Field(); + field.setFieldName("defaultId"); + field.setColumnName("defaultId"); + field.setType("varchar"); + field.addProperty("primarykey", "true"); + return field; + } + + public Class getKeyClass() { + return keyClass; + } + + public void setKeyClass(Class keyClass) { + this.keyClass = keyClass; + } + + public Class getPersistentClass() { + return persistentClass; + } + + void setPersistentClass(Class persistentClass) { + this.persistentClass = persistentClass; + } + + /** + * This method return the Inlined defined Partition Key, + * If there isn't any inlined define partition keys, + * this method returns default predefined partition key "defaultId". + * + * @return Partition Key {@link Field} + */ + public Field getInlinedDefinedPartitionKey() { + if (inlinedDefinedPartitionKey != null) { + return inlinedDefinedPartitionKey; + } else { + for (Field field : fieldList) { + if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) { + inlinedDefinedPartitionKey = field; + break; + } + } + if (inlinedDefinedPartitionKey == null) { + return getDefaultCassandraKey(); + } + return inlinedDefinedPartitionKey; + } + } + + void finalized() { + Field field = getInlinedDefinedPartitionKey(); + if (!fieldList.contains(field) && cassandraKey == null) { + fieldList.add(field); + } + } +}