http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java deleted file mode 100644 index 928370c..0000000 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java +++ /dev/null @@ -1,836 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gora.cassandra.serializers; - -import com.datastax.driver.core.querybuilder.BuiltStatement; -import com.datastax.driver.core.querybuilder.Delete; -import com.datastax.driver.core.querybuilder.QueryBuilder; -import com.datastax.driver.core.querybuilder.Select; -import com.datastax.driver.core.querybuilder.Update; -import com.datastax.driver.mapping.annotations.UDT; -import org.apache.avro.Schema; -import org.apache.gora.cassandra.bean.CassandraKey; -import org.apache.gora.cassandra.bean.ClusterKeyField; -import org.apache.gora.cassandra.bean.Field; -import org.apache.gora.cassandra.bean.KeySpace; -import org.apache.gora.cassandra.bean.PartitionKeyField; -import org.apache.gora.cassandra.query.CassandraQuery; -import org.apache.gora.cassandra.store.CassandraMapping; -import org.apache.gora.cassandra.store.CassandraStore; -import org.apache.gora.query.Query; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -/** - * This class is used create Cassandra Queries. - */ -class CassandraQueryFactory { - - private static final Logger LOG = LoggerFactory.getLogger(CassandraQueryFactory.class); - - /** - * This method returns the CQL query to create key space. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @return CQL Query - */ - static String getCreateKeySpaceQuery(CassandraMapping mapping) { - KeySpace keySpace = mapping.getKeySpace(); - StringBuilder stringBuffer = new StringBuilder(); - stringBuffer.append("CREATE KEYSPACE IF NOT EXISTS ").append(keySpace.getName()).append(" WITH REPLICATION = { 'class' : "); - KeySpace.PlacementStrategy placementStrategy = keySpace.getPlacementStrategy(); - stringBuffer.append("'").append(placementStrategy).append("'").append(", ").append("'"); - switch (placementStrategy) { - case SimpleStrategy: - stringBuffer.append("replication_factor").append("'").append(" : ").append(keySpace.getReplicationFactor()).append(" }"); - break; - case NetworkTopologyStrategy: - boolean isCommaNeeded = false; - for (Map.Entry<String, Integer> entry : keySpace.getDataCenters().entrySet()) { - if (isCommaNeeded) { - stringBuffer.append(", '"); - } - stringBuffer.append(entry.getKey()).append("'").append(" : ").append(entry.getValue()); - isCommaNeeded = true; - } - stringBuffer.append(" }"); - break; - } - if (keySpace.isDurableWritesEnabled()) { - stringBuffer.append(" AND DURABLE_WRITES = ").append(keySpace.isDurableWritesEnabled()); - } - return stringBuffer.toString(); - } - - /** - * This method returns the CQL query to table. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_table_r.html - * <p> - * Trick : To have a consistency of the order of the columns, first we append partition keys, second cluster keys and finally other columns. - * It's very much needed to follow the same order in other CRUD operations as well. - * - * @param mapping Cassandra mapping {@link CassandraMapping} - * @return CQL Query - */ - static String getCreateTableQuery(CassandraMapping mapping) { - StringBuilder stringBuffer = new StringBuilder(); - stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" ("); - CassandraKey cassandraKey = mapping.getCassandraKey(); - // appending Cassandra Persistent columns into db schema - processFieldsForCreateTableQuery(mapping.getFieldList(), false, stringBuffer); - - if (cassandraKey != null) { - processFieldsForCreateTableQuery(cassandraKey.getFieldList(), true, stringBuffer); - List<PartitionKeyField> partitionKeys = cassandraKey.getPartitionKeyFields(); - if (partitionKeys != null) { - stringBuffer.append(", PRIMARY KEY ("); - boolean isCommaNeededToApply = false; - for (PartitionKeyField keyField : partitionKeys) { - if (isCommaNeededToApply) { - stringBuffer.append(","); - } - if (keyField.isComposite()) { - stringBuffer.append("("); - boolean isCommaNeededHere = false; - for (Field field : keyField.getFields()) { - if (isCommaNeededHere) { - stringBuffer.append(", "); - } - stringBuffer.append(field.getColumnName()); - isCommaNeededHere = true; - } - stringBuffer.append(")"); - } else { - stringBuffer.append(keyField.getColumnName()); - } - isCommaNeededToApply = true; - } - stringBuffer.append(")"); - } - } - - stringBuffer.append(")"); - boolean isWithNeeded = true; - if (Boolean.parseBoolean(mapping.getProperty("compactStorage"))) { - stringBuffer.append(" WITH COMPACT STORAGE "); - isWithNeeded = false; - } - - String id = mapping.getProperty("id"); - if (id != null) { - if (isWithNeeded) { - stringBuffer.append(" WITH "); - } else { - stringBuffer.append(" AND "); - } - stringBuffer.append("ID = '").append(id).append("'"); - isWithNeeded = false; - } - if (cassandraKey != null) { - List<ClusterKeyField> clusterKeyFields = cassandraKey.getClusterKeyFields(); - if (clusterKeyFields != null) { - if (isWithNeeded) { - stringBuffer.append(" WITH "); - } else { - stringBuffer.append(" AND "); - } - stringBuffer.append(" CLUSTERING ORDER BY ("); - boolean isCommaNeededToApply = false; - for (ClusterKeyField keyField : clusterKeyFields) { - if (isCommaNeededToApply) { - stringBuffer.append(", "); - } - stringBuffer.append(keyField.getColumnName()).append(" "); - if (keyField.getOrder() != null) { - stringBuffer.append(keyField.getOrder()); - } - isCommaNeededToApply = true; - } - stringBuffer.append(")"); - } - } - return stringBuffer.toString(); - } - - private static void processFieldsForCreateTableQuery(List<Field> fields, boolean isCommaNeeded, StringBuilder stringBuilder) { - for (Field field : fields) { - if (isCommaNeeded) { - stringBuilder.append(", "); - } - stringBuilder.append(field.getColumnName()).append(" ").append(field.getType()); - boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static")); - boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); - if (isStaticColumn) { - stringBuilder.append(" STATIC"); - } - if (isPrimaryKey) { - stringBuilder.append(" PRIMARY KEY "); - } - isCommaNeeded = true; - } - } - - /** - * This method returns the CQL query to drop table. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @return CQL query - */ - static String getDropTableQuery(CassandraMapping mapping) { - return "DROP TABLE IF EXISTS " + mapping.getKeySpace().getName() + "." + mapping.getCoreName(); - } - - /** - * This method returns the CQL query to drop key space. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_keyspace_r.html - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @return CQL query - */ - static String getDropKeySpaceQuery(CassandraMapping mapping) { - return "DROP KEYSPACE IF EXISTS " + mapping.getKeySpace().getName(); - } - - /** - * This method returns the CQL query to truncate (removes all the data) in the table. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/truncate_r.html - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @return CQL query - */ - static String getTruncateTableQuery(CassandraMapping mapping) { - return QueryBuilder.truncate(mapping.getKeySpace().getName(), mapping.getCoreName()).getQueryString(); - } - - /** - * This method return the CQL query to insert data in to the table. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/insert_r.html - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @param fields available fields - * @return CQL Query - */ - static String getInsertDataQuery(CassandraMapping mapping, List<String> fields) { - String[] columnNames = getColumnNames(mapping, fields); - String[] objects = new String[fields.size()]; - Arrays.fill(objects, "?"); - return QueryBuilder.insertInto(mapping.getKeySpace().getName(), mapping.getCoreName()).values(columnNames, objects).getQueryString(); - } - - /** - * This method return the CQL query to delete a persistent in the table. - * refer : http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlDelete.html - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @param fields filed list to be deleted - * @return CQL Query - */ - static String getDeleteDataQuery(CassandraMapping mapping, List<String> fields) { - String[] columnNames = getColumnNames(mapping, fields); - String[] objects = new String[fields.size()]; - Arrays.fill(objects, "?"); - Delete delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName()); - return processKeys(columnNames, delete); - } - - private static String processKeys(String[] columnNames, BuiltStatement delete) { - BuiltStatement query = null; - boolean isWhereNeeded = true; - for (String columnName : columnNames) { - if (isWhereNeeded) { - if (delete instanceof Delete) { - query = ((Delete) delete).where(QueryBuilder.eq(columnName, "?")); - } else { - query = ((Select) delete).where(QueryBuilder.eq(columnName, "?")); - } - isWhereNeeded = false; - } else { - if (delete instanceof Delete) { - query = ((Delete.Where) query).and(QueryBuilder.eq(columnName, "?")); - } else { - query = ((Select.Where) query).and(QueryBuilder.eq(columnName, "?")); - } - } - } - return query != null ? query.getQueryString() : null; - } - - /** - * This method returns the CQL Select query to retrieve data from the table. - * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @param keyFields key fields - * @return CQL Query - */ - static String getSelectObjectQuery(CassandraMapping mapping, List<String> keyFields) { - Select select = QueryBuilder.select().from(mapping.getKeySpace().getName(), mapping.getCoreName()); - if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { - select.allowFiltering(); - } - String[] columnNames = getColumnNames(mapping, keyFields); - return processKeys(columnNames, select); - } - - /** - * This method returns CQL Select query to retrieve data from the table with given fields. - * This method is used for Avro Serialization - * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @param fields Given fields to retrieve - * @param keyFields key fields - * @return CQL Query - */ - static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, String[] fields, List<String> keyFields) { - Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName()); - if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { - select.allowFiltering(); - } - String[] columnNames = getColumnNames(mapping, keyFields); - return processKeys(columnNames, select); - } - - /** - * This method returns CQL Select query to retrieve data from the table with given fields. - * This method is used for Native Serialization - * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @param fields Given fields to retrieve - * @return CQL Query - */ - static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, String[] fields) { - String cqlQuery = null; - String[] columnNames = getColumnNames(mapping, Arrays.asList(fields)); - Select select = QueryBuilder.select(columnNames).from(mapping.getKeySpace().getName(), mapping.getCoreName()); - if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { - select.allowFiltering(); - } - CassandraKey cKey = mapping.getCassandraKey(); - if (cKey != null) { - Select.Where query = null; - boolean isWhereNeeded = true; - for (Field field : cKey.getFieldList()) { - if (isWhereNeeded) { - query = select.where(QueryBuilder.eq(field.getColumnName(), "?")); - isWhereNeeded = false; - } else { - query = query.and(QueryBuilder.eq(field.getColumnName(), "?")); - } - } - cqlQuery = query != null ? query.getQueryString() : null; - } else { - for (Field field : mapping.getFieldList()) { - boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); - if (isPrimaryKey) { - cqlQuery = select.where(QueryBuilder.eq(field.getColumnName(), "?")).getQueryString(); - break; - } - } - } - return cqlQuery; - } - - - /** - * This method returns CQL Query for execute method. This CQL contains a Select Query to retrieve data from the table - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @param cassandraQuery Query {@link CassandraQuery} - * @param objects object list - * @return CQL Query - */ - static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, String[] fields) { - long limit = cassandraQuery.getLimit(); - Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName()); - if (limit > 0) { - select = select.limit((int) limit); - } - if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { - select.allowFiltering(); - } - return processQuery(cassandraQuery, select, mapping, objects); - } - - private static String processQuery(Query cassandraQuery, BuiltStatement select, CassandraMapping mapping, List<Object> objects) { - String primaryKey = null; - BuiltStatement query = null; - Object startKey = cassandraQuery.getStartKey(); - Object endKey = cassandraQuery.getEndKey(); - Object key = cassandraQuery.getKey(); - boolean isWhereNeeded = true; - if (key != null) { - if (mapping.getCassandraKey() != null) { - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); - String[] columnKeys = getColumnNames(mapping, cassandraKeys); - for (int i = 0; i < cassandraKeys.size(); i++) { - if (isWhereNeeded) { - if (select instanceof Select) { - query = ((Select) select).where(QueryBuilder.eq(columnKeys[i], "?")); - } else if (select instanceof Delete) { - query = ((Delete) select).where(QueryBuilder.eq(columnKeys[i], "?")); - } else { - query = ((Update.Assignments) select).where(QueryBuilder.eq(columnKeys[i], "?")); - } - objects.add(cassandraValues.get(i)); - isWhereNeeded = false; - } else { - if (select instanceof Select) { - query = ((Select.Where) query).and(QueryBuilder.eq(columnKeys[i], "?")); - } else if (select instanceof Delete) { - query = ((Delete.Where) query).and(QueryBuilder.eq(columnKeys[i], "?")); - } else { - query = ((Update.Where) query).and(QueryBuilder.eq(columnKeys[i], "?")); - } - objects.add(cassandraValues.get(i)); - } - } - } else { - primaryKey = getPKey(mapping.getFieldList()); - if (select instanceof Select) { - query = ((Select) select).where(QueryBuilder.eq(primaryKey, "?")); - } else if (select instanceof Delete) { - query = ((Delete) select).where(QueryBuilder.eq(primaryKey, "?")); - } else { - query = ((Update.Assignments) select).where(QueryBuilder.eq(primaryKey, "?")); - } - objects.add(key); - } - } else { - if (startKey != null) { - if (mapping.getCassandraKey() != null) { - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues); - String[] columnKeys = getColumnNames(mapping, cassandraKeys); - for (int i = 0; i < cassandraKeys.size(); i++) { - if (isWhereNeeded) { - if (select instanceof Select) { - query = ((Select) select).where(QueryBuilder.gte(columnKeys[i], "?")); - } else if (select instanceof Delete) { - /* - According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. - */ - throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); - } else { - query = ((Update.Assignments) select).where(QueryBuilder.gte(columnKeys[i], "?")); - } - objects.add(cassandraValues.get(i)); - isWhereNeeded = false; - } else { - if (select instanceof Select) { - query = ((Select.Where) query).and(QueryBuilder.gte(columnKeys[i], "?")); - } else if (select instanceof Delete) { - /* - According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. - */ - throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); - } else { - query = ((Update.Where) query).and(QueryBuilder.gte(columnKeys[i], "?")); - } - objects.add(cassandraValues.get(i)); - } - } - } else { - primaryKey = getPKey(mapping.getFieldList()); - if (select instanceof Select) { - query = ((Select) select).where(QueryBuilder.gte(primaryKey, "?")); - } else if (select instanceof Delete) { - /* - According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. - */ - throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); - } else { - query = ((Update.Assignments) select).where(QueryBuilder.gte(primaryKey, "?")); - } - objects.add(startKey); - isWhereNeeded = false; - } - } - if (endKey != null) { - if (mapping.getCassandraKey() != null) { - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues); - String[] columnKeys = getColumnNames(mapping, cassandraKeys); - for (int i = 0; i < cassandraKeys.size(); i++) { - if (isWhereNeeded) { - if (select instanceof Select) { - query = ((Select) select).where(QueryBuilder.lte(columnKeys[i], "?")); - } else if (select instanceof Delete) { - /* - According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. - */ - throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); - } else { - query = ((Update.Assignments) select).where(QueryBuilder.lte(columnKeys[i], "?")); - } - objects.add(cassandraValues.get(i)); - isWhereNeeded = false; - } else { - if (select instanceof Select) { - query = ((Select.Where) query).and(QueryBuilder.lte(columnKeys[i], "?")); - } else if (select instanceof Delete) { - /* - According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. - */ - throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); - } else { - query = ((Update.Where) query).and(QueryBuilder.lte(columnKeys[i], "?")); - } - objects.add(cassandraValues.get(i)); - } - } - } else { - primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); - if (isWhereNeeded) { - if (select instanceof Select) { - query = ((Select) select).where(QueryBuilder.lte(primaryKey, "?")); - } else if (select instanceof Delete) { - /* - According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. - */ - throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); - } else { - query = ((Update.Assignments) select).where(QueryBuilder.lte(primaryKey, "?")); - } - } else { - if (select instanceof Select) { - query = ((Select.Where) query).and(QueryBuilder.lte(primaryKey, "?")); - } else if (select instanceof Delete) { - /* - According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. - */ - throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); - } else { - query = ((Update.Where) query).and(QueryBuilder.lte(primaryKey, "?")); - } - } - objects.add(endKey); - } - } - } - if (startKey == null && endKey == null && key == null) { - return select.getQueryString(); - } - return query != null ? query.getQueryString() : null; - } - - private static String[] getColumnNames(CassandraMapping mapping, List<String> fields) { - ArrayList<String> columnNames = new ArrayList<>(); - for (String field : fields) { - Field fieldBean = mapping.getFieldFromFieldName(field); - CassandraKey cassandraKey = mapping.getCassandraKey(); - Field keyBean = null; - if (cassandraKey != null) { - keyBean = cassandraKey.getFieldFromFieldName(field); - } - if (fieldBean != null) { - columnNames.add(fieldBean.getColumnName()); - } else if (keyBean != null) { - columnNames.add(keyBean.getColumnName()); - } else { - LOG.warn("{} field is ignored, couldn't find relevant field in the persistent mapping", field); - } - } - return columnNames.toArray(new String[0]); - } - - private static String getPKey(List<Field> fields) { - for (Field field : fields) { - boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); - if (isPrimaryKey) { - return field.getColumnName(); - } - } - return null; - } - - /** - * This method returns CQL Qeury for DeleteByQuery method. - * refer: http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlDelete.html - * - * @param mapping Cassandra Mapping {@link CassandraMapping} - * @param cassandraQuery Cassandra Query {@link CassandraQuery} - * @param objects field values - * @return CQL Query - */ - static String getDeleteByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { - String[] columns = null; - if (cassandraQuery.getFields() != null) { - columns = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); - } - Delete delete; - if (columns != null) { - delete = QueryBuilder.delete(columns).from(mapping.getKeySpace().getName(), mapping.getCoreName()); - } else { - delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName()); - } - return processQuery(cassandraQuery, delete, mapping, objects); - } - - /** - * This method returns the CQL Query for UpdateByQuery method - * refer : http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlUpdate.html - * - * @param mapping Cassandra mapping {@link CassandraMapping} - * @param cassandraQuery Cassandra Query {@link CassandraQuery} - * @param objects field Objects list - * @return CQL Query - */ - static String getUpdateByQueryForAvro(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, Schema schema) { - Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName()); - Update.Assignments updateAssignments = null; - if (cassandraQuery instanceof CassandraQuery) { - String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); - for (String column : columnNames) { - updateAssignments = update.with(QueryBuilder.set(column, "?")); - Field field = mapping.getFieldFromColumnName(column); - Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field.getFieldName()); - try { - Schema schemaField = schema.getField(field.getFieldName()).schema(); - objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value, field)); - } catch (NullPointerException e) { - throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + "."); - } - } - } else { - throw new RuntimeException("Please use Cassandra Query object to invoke, UpdateByQuery method."); - } - return processQuery(cassandraQuery, updateAssignments, mapping, objects); - } - - - /** - * This method returns the CQL Query for UpdateByQuery method - * refer : http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlUpdate.html - * - * @param mapping Cassandra mapping {@link CassandraMapping} - * @param cassandraQuery Cassandra Query {@link CassandraQuery} - * @param objects field Objects list - * @return CQL Query - */ - static String getUpdateByQueryForNative(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { - Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName()); - Update.Assignments updateAssignments = null; - if (cassandraQuery instanceof CassandraQuery) { - String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); - for (String column : columnNames) { - updateAssignments = update.with(QueryBuilder.set(column, "?")); - objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName())); - } - } else { - throw new RuntimeException("Please use Cassandra Query object to invoke, UpdateByQuery method."); - } - return processQuery(cassandraQuery, updateAssignments, mapping, objects); - } - - - private static void populateFieldsToQuery(Schema schema, StringBuilder builder) throws Exception { - switch (schema.getType()) { - case INT: - builder.append("int"); - break; - case MAP: - builder.append("map<text,"); - populateFieldsToQuery(schema.getValueType(), builder); - builder.append(">"); - break; - case ARRAY: - builder.append("list<"); - populateFieldsToQuery(schema.getElementType(), builder); - builder.append(">"); - break; - case LONG: - builder.append("bigint"); - break; - case FLOAT: - builder.append("float"); - break; - case DOUBLE: - builder.append("double"); - break; - case BOOLEAN: - builder.append("boolean"); - break; - case BYTES: - builder.append("blob"); - break; - case RECORD: - builder.append("frozen<").append(schema.getName()).append(">"); - break; - case STRING: - case FIXED: - case ENUM: - builder.append("text"); - break; - case UNION: - for (Schema unionElementSchema : schema.getTypes()) { - if (unionElementSchema.getType().equals(Schema.Type.RECORD)) { - String recordName = unionElementSchema.getName(); - if (!builder.toString().contains(recordName)) { - builder.append("frozen<").append(recordName).append(">"); - } else { - LOG.warn("Same Field Type can't be mapped recursively. This is not supported with Cassandra UDT types, Please use byte dataType for recursive mapping."); - throw new Exception("Same Field Type has mapped recursively"); - } - break; - } else if (!unionElementSchema.getType().equals(Schema.Type.NULL)) { - populateFieldsToQuery(unionElementSchema, builder); - break; - } - } - break; - } - } - - static void processRecord(Schema recordSchema, StringBuilder stringBuilder) { - boolean isCommaNeeded = false; - for (Schema.Field field : recordSchema.getFields()) { - if (isCommaNeeded) { - stringBuilder.append(", "); - } - String fieldName = field.name(); - stringBuilder.append(fieldName).append(" "); - try { - populateFieldsToQuery(field.schema(), stringBuilder); - isCommaNeeded = true; - } catch (Exception e) { - int i = stringBuilder.indexOf(fieldName); - if (i != -1) { - stringBuilder.delete(i, i + fieldName.length()); - isCommaNeeded = false; - } - } - } - } - - static String getCreateUDTTypeForNative(CassandraMapping mapping, Class persistentClass, String udtType, String fieldName) throws NoSuchFieldException { - StringBuilder stringBuffer = new StringBuilder(); - Class udtClass = persistentClass.getDeclaredField(fieldName).getType(); - UDT annotation = (UDT) udtClass.getAnnotation(UDT.class); - if (annotation != null) { - stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" ("); - boolean isCommaNeeded = false; - for (java.lang.reflect.Field udtField : udtClass.getDeclaredFields()) { - com.datastax.driver.mapping.annotations.Field fieldAnnotation = udtField.getDeclaredAnnotation(com.datastax.driver.mapping.annotations.Field.class); - if (fieldAnnotation != null) { - if (isCommaNeeded) { - stringBuffer.append(", "); - } - if (!fieldAnnotation.name().isEmpty()) { - stringBuffer.append(fieldAnnotation.name()).append(" "); - } else { - stringBuffer.append(udtField.getName()).append(" "); - } - stringBuffer.append(dataType(udtField, null)); - isCommaNeeded = true; - } - } - stringBuffer.append(")"); - } else { - throw new RuntimeException(""); - } - return stringBuffer.toString(); - } - - static String getCreateUDTTypeForAvro(CassandraMapping mapping, String udtType, Schema fieldSchema) { - StringBuilder stringBuffer = new StringBuilder(); - stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" ("); - CassandraQueryFactory.processRecord(fieldSchema, stringBuffer); - stringBuffer.append(")"); - return stringBuffer.toString(); - } - - private static String dataType(java.lang.reflect.Field field, Type fieldType) { - String type; - if (field != null) { - type = field.getType().getName(); - } else { - type = fieldType.getTypeName(); - } - String dataType; - String s = type; - if (s.equals("java.lang.String") || s.equals("java.lang.CharSequence")) { - dataType = "text"; - } else if (s.equals("int") || s.equals("java.lang.Integer")) { - dataType = "int"; - } else if (s.equals("double") || s.equals("java.lang.Double")) { - dataType = "double"; - } else if (s.equals("float") || s.equals("java.lang.Float")) { - dataType = "float"; - } else if (s.equals("boolean") || s.equals("java.lang.Boolean")) { - dataType = "boolean"; - } else if (s.equals("java.util.UUID")) { - dataType = "uuid"; - } else if (s.equals("java.lang.Long")) { - dataType = "bigint"; - } else if (s.equals("java.math.BigDecimal")) { - dataType = "decimal"; - } else if (s.equals("java.net.InetAddress")) { - dataType = "inet"; - } else if (s.equals("java.math.BigInteger")) { - dataType = "varint"; - } else if (s.equals("java.nio.ByteBuffer")) { - dataType = "blob"; - } else if (s.contains("Map")) { - ParameterizedType mapType; - if (field != null) { - mapType = (ParameterizedType) field.getGenericType(); - } else { - mapType = (ParameterizedType) fieldType; - } - Type value1 = mapType.getActualTypeArguments()[0]; - Type value2 = mapType.getActualTypeArguments()[1]; - dataType = "map<" + dataType(null, value1) + "," + dataType(null, value2) + ">"; - } else if (s.contains("List")) { - ParameterizedType listType; - if (field != null) { - listType = (ParameterizedType) field.getGenericType(); - } else { - listType = (ParameterizedType) fieldType; - } - Type value = listType.getActualTypeArguments()[0]; - dataType = "list<" + dataType(null, value) + ">"; - } else if (s.contains("Set")) { - ParameterizedType setType; - if (field != null) { - setType = (ParameterizedType) field.getGenericType(); - } else { - setType = (ParameterizedType) fieldType; - } - Type value = setType.getActualTypeArguments()[0]; - dataType = "set<" + dataType(null, value) + ">"; - } else { - throw new RuntimeException("Unsupported Cassandra DataType"); - } - return dataType; - } - -}
http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java deleted file mode 100644 index 208428e..0000000 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gora.cassandra.serializers; - -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.SimpleStatement; -import com.datastax.driver.core.TableMetadata; -import org.apache.gora.cassandra.bean.Field; -import org.apache.gora.cassandra.store.CassandraClient; -import org.apache.gora.cassandra.store.CassandraMapping; -import org.apache.gora.cassandra.store.CassandraStore; -import org.apache.gora.persistency.Persistent; -import org.apache.gora.query.Query; -import org.apache.gora.query.Result; -import org.apache.gora.store.DataStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; - -/** - * This is the abstract Cassandra Serializer class. - */ -public abstract class CassandraSerializer<K, T extends Persistent> { - - private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class); - - protected Class<K> keyClass; - - protected Class<T> persistentClass; - - protected CassandraMapping mapping; - - protected CassandraClient client; - - protected String readConsistencyLevel; - - protected String writeConsistencyLevel; - - protected Map<String, String> userDefineTypeMaps; - - CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) { - this.keyClass = keyClass; - this.persistentClass = persistantClass; - this.client = cc; - this.mapping = mapping; - this.readConsistencyLevel = client.getReadConsistencyLevel(); - this.writeConsistencyLevel = client.getWriteConsistencyLevel(); - } - - /** - * This method returns the Cassandra Serializer according the Cassandra serializer property. - * - * @param cc Cassandra Client - * @param type Serialization type - * @param dataStore Cassandra DataStore - * @param mapping Cassandra Mapping - * @param <K> key class - * @param <T> persistent class - * @return Serializer - */ - public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) { - CassandraStore.SerializerType serType = type == null || type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH)); - CassandraSerializer serializer; - switch (serType) { - case AVRO: - serializer = new AvroSerializer(cc, dataStore, mapping); - break; - case NATIVE: - default: - serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping); - } - return serializer; - } - - /** - * In this method persistent class been analyzed to find inner records with UDT type, this method should call in every Cassandra serialization Constructor. - * - * @throws Exception - */ - protected abstract void analyzePersistent() throws Exception; - - - public void createSchema() { - LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName()); - this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping)); - for (Map.Entry udtType : userDefineTypeMaps.entrySet()) { - LOG.debug("creating Cassandra User Define Type {}", udtType.getKey()); - this.client.getSession().execute((String) udtType.getValue()); - } - LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName()); - this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping)); - } - - public void deleteSchema() { - LOG.debug("dropping Cassandra table {}", mapping.getCoreName()); - this.client.getSession().execute(CassandraQueryFactory.getDropTableQuery(mapping)); - LOG.debug("dropping Cassandra keyspace {}", mapping.getKeySpace().getName()); - this.client.getSession().execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping)); - } - - public void close() { - this.client.close(); - } - - public void truncateSchema() { - LOG.debug("truncating Cassandra table {}", mapping.getCoreName()); - this.client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); - } - - public boolean schemaExists() { - KeyspaceMetadata keyspace = this.client.getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()); - if (keyspace != null) { - TableMetadata table = keyspace.getTable(mapping.getCoreName()); - return table != null; - } else { - return false; - } - } - - protected String[] getFields() { - List<String> fields = new ArrayList<>(); - for (Field field : mapping.getFieldList()) { - fields.add(field.getFieldName()); - } - return fields.toArray(new String[0]); - } - - /** - * Inserts the persistent Object - * - * @param key key value - * @param value persistent value - */ - public abstract void put(K key, T value); - - /** - * Retrieves the persistent value according to the key - * - * @param key key value - * @return persistent value - */ - public abstract T get(K key); - - /** - * Deletes persistent value according to the key - * - * @param key key value - * @return isDeleted - */ - public abstract boolean delete(K key); - - /** - * Retrieves the persistent value according to the key and fields - * - * @param key key value - * @param fields fields - * @return persistent value - */ - public abstract T get(K key, String[] fields); - - /** - * Executes the given query and returns the results. - * - * @param dataStore Cassandra data store - * @param query Cassandra Query - * @return Cassandra Result - */ - public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> query); - - /** - * Update the persistent objects - * - * @param query Cassandra Query - * @return isUpdated - */ - public abstract boolean updateByQuery(Query query); - - public long deleteByQuery(Query query) { - List<Object> objectArrayList = new ArrayList<>(); - if (query.getKey() == null && query.getEndKey() == null && query.getStartKey() == null) { - if (query.getFields() == null) { - client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); - } else { - LOG.error("Delete by Query is not supported for the Queries which didn't specify Query keys with fields."); - } - } else { - String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList); - ResultSet results; - SimpleStatement statement; - if (objectArrayList.size() == 0) { - statement = new SimpleStatement(cqlQuery); - } else { - statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); - } - if (writeConsistencyLevel != null) { - statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); - } - results = client.getSession().execute(statement); - LOG.debug("Delete by Query was applied : " + results.wasApplied()); - } - LOG.info("Delete By Query method doesn't return the deleted element count."); - return 0; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java deleted file mode 100644 index bf28ee0..0000000 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gora.cassandra.serializers; - -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.SimpleStatement; -import com.datastax.driver.mapping.Mapper; -import com.datastax.driver.mapping.MappingManager; -import com.datastax.driver.mapping.Result; -import org.apache.commons.lang.ArrayUtils; -import org.apache.gora.cassandra.bean.Field; -import org.apache.gora.cassandra.query.CassandraResultSet; -import org.apache.gora.cassandra.store.CassandraClient; -import org.apache.gora.cassandra.store.CassandraMapping; -import org.apache.gora.persistency.Persistent; -import org.apache.gora.query.Query; -import org.apache.gora.store.DataStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; - -/** - * This Class contains the operation relates to Native Serialization. - */ -class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { - - private static final Logger LOG = LoggerFactory.getLogger(NativeSerializer.class); - - private Mapper<T> mapper; - - NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { - super(cassandraClient, keyClass, persistentClass, mapping); - try { - analyzePersistent(); - } catch (Exception e) { - throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage()); - } - this.createSchema(); - MappingManager mappingManager = new MappingManager(cassandraClient.getSession()); - mapper = mappingManager.mapper(persistentClass); - if (cassandraClient.getWriteConsistencyLevel() != null) { - mapper.setDefaultDeleteOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getWriteConsistencyLevel()))); - mapper.setDefaultSaveOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getWriteConsistencyLevel()))); - } - if (cassandraClient.getReadConsistencyLevel() != null) { - mapper.setDefaultGetOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getReadConsistencyLevel()))); - } - } - - /** - * {@inheritDoc} - * - * @param key - * @param value - */ - @Override - public void put(Object key, Persistent value) { - LOG.debug("Object is saved with key : {} and value : {}", key, value); - mapper.save((T) value); - } - - /** - * {@inheritDoc} - * - * @param key - * @return - */ - @Override - public T get(Object key) { - T object = mapper.get(key); - if (object != null) { - LOG.debug("Object is found for key : {}", key); - } else { - LOG.debug("Object is not found for key : {}", key); - } - return object; - } - - /** - * {@inheritDoc} - * - * @param key - * @return - */ - @Override - public boolean delete(Object key) { - LOG.debug("Object is deleted for key : {}", key); - mapper.delete(key); - return true; - } - - /** - * {@inheritDoc} - * - * @param key - * @param fields - * @return - */ - @Override - public Persistent get(Object key, String[] fields) { - if (fields == null) { - fields = getFields(); - } - String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields); - SimpleStatement statement = new SimpleStatement(cqlQuery, key); - if (readConsistencyLevel != null) { - statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); - } - ResultSet results = client.getSession().execute(statement); - Result<T> objects = mapper.map(results); - List<T> objectList = objects.all(); - if (objectList != null) { - LOG.debug("Object is found for key : {}", key); - return objectList.get(0); - } - LOG.debug("Object is not found for key : {}", key); - return null; - } - - /** - * {@inheritDoc} - * - * @throws Exception - */ - @Override - protected void analyzePersistent() throws Exception { - userDefineTypeMaps = new HashMap<>(); - for (Field field : mapping.getFieldList()) { - String fieldType = field.getType(); - if (fieldType.contains("frozen")) { - String udtType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">")); - String createQuery = CassandraQueryFactory.getCreateUDTTypeForNative(mapping, persistentClass, udtType, field.getFieldName()); - userDefineTypeMaps.put(udtType, createQuery); - } - } - } - - /** - * {@inheritDoc} - * - * @param query - * @return - */ - @Override - public boolean updateByQuery(Query query) { - List<Object> objectArrayList = new ArrayList<>(); - String cqlQuery = CassandraQueryFactory.getUpdateByQueryForNative(mapping, query, objectArrayList); - ResultSet results; - SimpleStatement statement; - if (objectArrayList.size() == 0) { - statement = new SimpleStatement(cqlQuery); - } else { - statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); - } - if (writeConsistencyLevel != null) { - statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); - } - results = client.getSession().execute(statement); - return results.wasApplied(); - } - - /** - * {@inheritDoc} - * - * @param dataStore - * @param query - * @return - */ - @Override - public org.apache.gora.query.Result execute(DataStore dataStore, Query query) { - List<Object> objectArrayList = new ArrayList<>(); - String[] fields = query.getFields(); - if (fields != null) { - fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys()); - } else { - fields = mapping.getAllFieldsIncludingKeys(); - } - CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); - String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields); - ResultSet results; - if (objectArrayList.size() == 0) { - results = client.getSession().execute(cqlQuery); - } else { - results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); - } - Result<T> objects = mapper.map(results); - Iterator iterator = objects.iterator(); - while (iterator.hasNext()) { - T result = (T) iterator.next(); - K key = getKey(result); - cassandraResult.addResultElement(key, result); - } - return cassandraResult; - } - - private K getKey(T object) { - String keyField = null; - for (Field field : mapping.getFieldList()) { - boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); - if (isPrimaryKey) { - keyField = field.getFieldName(); - break; - } - } - K key; - Method keyMethod = null; - try { - for (Method method : this.persistentClass.getMethods()) { - if (method.getName().equalsIgnoreCase("get" + keyField)) { - keyMethod = method; - } - } - key = (K) keyMethod.invoke(object); - } catch (Exception e) { - try { - key = (K) this.persistentClass.getField(keyField).get(object); - } catch (Exception e1) { - throw new RuntimeException("Field" + keyField + " is not accessible in " + persistentClass + " : " + e1.getMessage()); - } - } - return key; - } -} http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java deleted file mode 100644 index ce1e3e7..0000000 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains Cassandra store related util classes for serializer. - */ -package org.apache.gora.cassandra.serializers; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java deleted file mode 100644 index f672884..0000000 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java +++ /dev/null @@ -1,535 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gora.cassandra.store; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.HostDistance; -import com.datastax.driver.core.PoolingOptions; -import com.datastax.driver.core.ProtocolOptions; -import com.datastax.driver.core.ProtocolVersion; -import com.datastax.driver.core.QueryOptions; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.SocketOptions; -import com.datastax.driver.core.TypeCodec; -import com.datastax.driver.core.policies.ConstantReconnectionPolicy; -import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; -import com.datastax.driver.core.policies.DefaultRetryPolicy; -import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; -import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; -import com.datastax.driver.core.policies.FallthroughRetryPolicy; -import com.datastax.driver.core.policies.LatencyAwarePolicy; -import com.datastax.driver.core.policies.LoggingRetryPolicy; -import com.datastax.driver.core.policies.RoundRobinPolicy; -import com.datastax.driver.core.policies.TokenAwarePolicy; -import com.datastax.driver.extras.codecs.arrays.DoubleArrayCodec; -import com.datastax.driver.extras.codecs.arrays.FloatArrayCodec; -import com.datastax.driver.extras.codecs.arrays.IntArrayCodec; -import com.datastax.driver.extras.codecs.arrays.LongArrayCodec; -import com.datastax.driver.extras.codecs.arrays.ObjectArrayCodec; -import com.datastax.driver.extras.codecs.date.SimpleDateCodec; -import com.datastax.driver.extras.codecs.date.SimpleTimestampCodec; -import com.datastax.driver.extras.codecs.jdk8.OptionalCodec; -import org.apache.gora.cassandra.bean.Field; -import org.jdom.Document; -import org.jdom.Element; -import org.jdom.JDOMException; -import org.jdom.input.SAXBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Properties; - -/** - * This class provides the Cassandra Client Connection. - * Initialize the Cassandra Connection according to the Properties. - */ -public class CassandraClient { - - private static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class); - - private Cluster cluster; - - private Session session; - - private CassandraMapping mapping; - - private String readConsistencyLevel; - - private String writeConsistencyLevel; - - public Session getSession() { - return session; - } - - public Cluster getCluster() { - return cluster; - } - - void initialize(Properties properties, CassandraMapping mapping) throws Exception { - Cluster.Builder builder = Cluster.builder(); - List<String> codecs = readCustomCodec(properties); - builder = populateSettings(builder, properties); - this.mapping = mapping; - this.cluster = builder.build(); - if (codecs != null) { - registerCustomCodecs(codecs); - } - readConsistencyLevel = properties.getProperty(CassandraStoreParameters.READ_CONSISTENCY_LEVEL); - writeConsistencyLevel = properties.getProperty(CassandraStoreParameters.WRITE_CONSISTENCY_LEVEL); - registerOptionalCodecs(); - this.session = this.cluster.connect(); - } - - private void registerOptionalCodecs() { - // Optional Codecs for natives - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.ascii())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.bigint())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.blob())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cboolean())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cdouble())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cfloat())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.cint())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.counter())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.date())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.decimal())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.inet())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.smallInt())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.time())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.timestamp())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.timeUUID())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.tinyInt())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.varint())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.varchar())); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.uuid())); - // Optional Array Codecs - this.cluster.getConfiguration().getCodecRegistry().register(new IntArrayCodec()); - this.cluster.getConfiguration().getCodecRegistry().register(new DoubleArrayCodec()); - this.cluster.getConfiguration().getCodecRegistry().register(new FloatArrayCodec()); - this.cluster.getConfiguration().getCodecRegistry().register(new LongArrayCodec()); - this.cluster.getConfiguration().getCodecRegistry().register(new ObjectArrayCodec<>( - DataType.list(DataType.varchar()), - String[].class, - TypeCodec.varchar())); - // Optional Time Codecs - this.cluster.getConfiguration().getCodecRegistry().register(new SimpleDateCodec()); - this.cluster.getConfiguration().getCodecRegistry().register(new SimpleTimestampCodec()); - - for (Field field : this.mapping.getFieldList()) { - String columnType = field.getType().toLowerCase(Locale.ENGLISH); - //http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cql_data_types_c.html - if (columnType.contains("list")) { - columnType = columnType.substring(columnType.indexOf("<") + 1, columnType.indexOf(">")); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.list(getTypeCodec(columnType)))); - } else if (columnType.contains("set")) { - columnType = columnType.substring(columnType.indexOf("<") + 1, columnType.indexOf(">")); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.set(getTypeCodec(columnType)))); - } else if (columnType.contains("map")) { - String[] columnTypes = columnType.substring(columnType.indexOf("<") + 1, columnType.indexOf(">")).split(","); - this.cluster.getConfiguration().getCodecRegistry().register(new OptionalCodec<>(TypeCodec.map(TypeCodec.set(getTypeCodec(columnTypes[0])), TypeCodec.set(getTypeCodec(columnTypes[1]))))); - } - } - } - - private TypeCodec getTypeCodec(String columnType) { - TypeCodec typeCodec; - switch (columnType) { - case "ascii": - typeCodec = TypeCodec.ascii(); - break; - case "bigint": - typeCodec = TypeCodec.bigint(); - break; - case "blob": - typeCodec = TypeCodec.blob(); - break; - case "boolean": - typeCodec = TypeCodec.cboolean(); - break; - case "counter": - typeCodec = TypeCodec.counter(); - break; - case "date": - typeCodec = TypeCodec.date(); - break; - case "decimal": - typeCodec = TypeCodec.decimal(); - break; - case "double": - typeCodec = TypeCodec.cdouble(); - break; - case "float": - typeCodec = TypeCodec.cfloat(); - break; - case "inet": - typeCodec = TypeCodec.inet(); - break; - case "int": - typeCodec = TypeCodec.cint(); - break; - case "smallint": - typeCodec = TypeCodec.smallInt(); - break; - case "time": - typeCodec = TypeCodec.time(); - break; - case "timestamp": - typeCodec = TypeCodec.timestamp(); - break; - case "timeuuid": - typeCodec = TypeCodec.timeUUID(); - break; - case "tinyint": - typeCodec = TypeCodec.tinyInt(); - break; - case "uuid": - typeCodec = TypeCodec.uuid(); - break; - case "varint": - typeCodec = TypeCodec.varint(); - break; - case "varchar": - case "text": - typeCodec = TypeCodec.varchar(); - break; - default: - LOG.error("Unsupported Cassandra datatype: {} ", columnType); - throw new RuntimeException("Unsupported Cassandra datatype: " + columnType); - } - return typeCodec; - } - - private Cluster.Builder populateSettings(Cluster.Builder builder, Properties properties) { - String serversParam = properties.getProperty(CassandraStoreParameters.CASSANDRA_SERVERS); - String[] servers = serversParam.split(","); - for (String server : servers) { - builder = builder.addContactPoint(server); - } - String portProp = properties.getProperty(CassandraStoreParameters.PORT); - if (portProp != null) { - builder = builder.withPort(Integer.parseInt(portProp)); - } - String clusterNameProp = properties.getProperty(CassandraStoreParameters.CLUSTER_NAME); - if (clusterNameProp != null) { - builder = builder.withClusterName(clusterNameProp); - } - String compressionProp = properties.getProperty(CassandraStoreParameters.COMPRESSION); - if (compressionProp != null) { - builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compressionProp)); - } - builder = this.populateCredentials(properties, builder); - builder = this.populateLoadBalancingProp(properties, builder); - String enableJMXProp = properties.getProperty(CassandraStoreParameters.ENABLE_JMX_REPORTING); - if (!Boolean.parseBoolean(enableJMXProp)) { - builder = builder.withoutJMXReporting(); - } - String enableMetricsProp = properties.getProperty(CassandraStoreParameters.ENABLE_METRICS); - if (!Boolean.parseBoolean(enableMetricsProp)) { - builder = builder.withoutMetrics(); - } - builder = this.populatePoolingSettings(properties, builder); - String versionProp = properties.getProperty(CassandraStoreParameters.PROTOCOL_VERSION); - if (versionProp != null) { - builder = builder.withProtocolVersion(ProtocolVersion.fromInt(Integer.parseInt(versionProp))); - } - builder = this.populateQueryOptions(properties, builder); - builder = this.populateReconnectPolicy(properties, builder); - builder = this.populateRetrytPolicy(properties, builder); - builder = this.populateSocketOptions(properties, builder); - String enableSSLProp = properties.getProperty(CassandraStoreParameters.ENABLE_SSL); - if (enableSSLProp != null) { - if (Boolean.parseBoolean(enableSSLProp)) { - builder = builder.withSSL(); - } - } - return builder; - } - - private Cluster.Builder populateLoadBalancingProp(Properties properties, Cluster.Builder builder) { - String loadBalancingProp = properties.getProperty(CassandraStoreParameters.LOAD_BALANCING_POLICY); - if (loadBalancingProp != null) { - switch (loadBalancingProp) { - case "LatencyAwareRoundRobinPolicy": - builder = builder.withLoadBalancingPolicy(LatencyAwarePolicy.builder(new RoundRobinPolicy()).build()); - break; - case "RoundRobinPolicy": - builder = builder.withLoadBalancingPolicy(new RoundRobinPolicy()); - break; - case "DCAwareRoundRobinPolicy": { - String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER); - boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean( - properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL)); - if (dataCenter != null && !dataCenter.isEmpty()) { - if (allowRemoteDCsForLocalConsistencyLevel) { - builder = builder.withLoadBalancingPolicy( - DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter) - .allowRemoteDCsForLocalConsistencyLevel().build()); - } else { - builder = builder.withLoadBalancingPolicy( - DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build()); - } - } else { - if (allowRemoteDCsForLocalConsistencyLevel) { - builder = builder.withLoadBalancingPolicy( - (DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build())); - } else { - builder = builder.withLoadBalancingPolicy((DCAwareRoundRobinPolicy.builder().build())); - } - } - break; - } - case "TokenAwareRoundRobinPolicy": - builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())); - break; - case "TokenAwareDCAwareRoundRobinPolicy": { - String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER); - boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean( - properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL)); - if (dataCenter != null && !dataCenter.isEmpty()) { - if (allowRemoteDCsForLocalConsistencyLevel) { - builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( - DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter) - .allowRemoteDCsForLocalConsistencyLevel().build())); - } else { - builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( - DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build())); - } - } else { - if (allowRemoteDCsForLocalConsistencyLevel) { - builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( - DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build())); - } else { - builder = builder.withLoadBalancingPolicy( - new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())); - } - } - break; - } - default: - LOG.error("Unsupported Cassandra load balancing policy: {} ", loadBalancingProp); - break; - } - } - return builder; - } - - private Cluster.Builder populateCredentials(Properties properties, Cluster.Builder builder) { - String usernameProp = properties.getProperty(CassandraStoreParameters.USERNAME); - String passwordProp = properties.getProperty(CassandraStoreParameters.PASSWORD); - if (usernameProp != null) { - builder = builder.withCredentials(usernameProp, passwordProp); - } - return builder; - } - - private Cluster.Builder populatePoolingSettings(Properties properties, Cluster.Builder builder) { - String localCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_CORE_CONNECTIONS_PER_HOST); - String remoteCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_CORE_CONNECTIONS_PER_HOST); - String localMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_CONNECTIONS_PER_HOST); - String remoteMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_CONNECTIONS_PER_HOST); - String localNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.LOCAL_NEW_CONNECTION_THRESHOLD); - String remoteNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.REMOTE_NEW_CONNECTION_THRESHOLD); - String localMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_REQUESTS_PER_CONNECTION); - String remoteMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_REQUESTS_PER_CONNECTION); - PoolingOptions options = new PoolingOptions(); - if (localCoreConnectionsPerHost != null) { - options.setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localCoreConnectionsPerHost)); - } - if (remoteCoreConnectionsPerHost != null) { - options.setCoreConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteCoreConnectionsPerHost)); - } - if (localMaxConnectionsPerHost != null) { - options.setMaxConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localMaxConnectionsPerHost)); - } - if (remoteMaxConnectionsPerHost != null) { - options.setMaxConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteMaxConnectionsPerHost)); - } - if (localNewConnectionThreshold != null) { - options.setNewConnectionThreshold(HostDistance.LOCAL, Integer.parseInt(localNewConnectionThreshold)); - } - if (remoteNewConnectionThreshold != null) { - options.setNewConnectionThreshold(HostDistance.REMOTE, Integer.parseInt(remoteNewConnectionThreshold)); - } - if (localMaxRequestsPerConnection != null) { - options.setMaxRequestsPerConnection(HostDistance.LOCAL, Integer.parseInt(localMaxRequestsPerConnection)); - } - if (remoteMaxRequestsPerConnection != null) { - options.setMaxRequestsPerConnection(HostDistance.REMOTE, Integer.parseInt(remoteMaxRequestsPerConnection)); - } - builder = builder.withPoolingOptions(options); - return builder; - } - - private Cluster.Builder populateQueryOptions(Properties properties, Cluster.Builder builder) { - String consistencyLevelProp = properties.getProperty(CassandraStoreParameters.CONSISTENCY_LEVEL); - String serialConsistencyLevelProp = properties.getProperty(CassandraStoreParameters.SERIAL_CONSISTENCY_LEVEL); - String fetchSize = properties.getProperty(CassandraStoreParameters.FETCH_SIZE); - QueryOptions options = new QueryOptions(); - if (consistencyLevelProp != null) { - options.setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevelProp)); - } - if (serialConsistencyLevelProp != null) { - options.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevelProp)); - } - if (fetchSize != null) { - options.setFetchSize(Integer.parseInt(fetchSize)); - } - return builder.withQueryOptions(options); - } - - private Cluster.Builder populateReconnectPolicy(Properties properties, Cluster.Builder builder) { - String reconnectionPolicy = properties.getProperty(CassandraStoreParameters.RECONNECTION_POLICY); - if (reconnectionPolicy != null) { - switch (reconnectionPolicy) { - case "ConstantReconnectionPolicy": { - String constantReconnectionPolicyDelay = properties.getProperty(CassandraStoreParameters.CONSTANT_RECONNECTION_POLICY_DELAY); - ConstantReconnectionPolicy policy = new ConstantReconnectionPolicy(Long.parseLong(constantReconnectionPolicyDelay)); - builder = builder.withReconnectionPolicy(policy); - break; - } - case "ExponentialReconnectionPolicy": { - String exponentialReconnectionPolicyBaseDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_BASE_DELAY); - String exponentialReconnectionPolicyMaxDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_MAX_DELAY); - - ExponentialReconnectionPolicy policy = new ExponentialReconnectionPolicy(Long.parseLong(exponentialReconnectionPolicyBaseDelay), - Long.parseLong(exponentialReconnectionPolicyMaxDelay)); - builder = builder.withReconnectionPolicy(policy); - break; - } - default: - LOG.error("Unsupported reconnection policy : {} ", reconnectionPolicy); - } - } - return builder; - } - - private Cluster.Builder populateRetrytPolicy(Properties properties, Cluster.Builder builder) { - String retryPolicy = properties.getProperty(CassandraStoreParameters.RETRY_POLICY); - if (retryPolicy != null) { - switch (retryPolicy) { - case "DefaultRetryPolicy": - builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); - break; - case "DowngradingConsistencyRetryPolicy": - builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); - break; - case "FallthroughRetryPolicy": - builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE); - break; - case "LoggingDefaultRetryPolicy": - builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); - break; - case "LoggingDowngradingConsistencyRetryPolicy": - builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); - break; - case "LoggingFallthroughRetryPolicy": - builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE)); - break; - default: - LOG.error("Unsupported retry policy : {} ", retryPolicy); - break; - } - } - return builder; - } - - private Cluster.Builder populateSocketOptions(Properties properties, Cluster.Builder builder) { - String connectionTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.CONNECTION_TIMEOUT_MILLIS); - String keepAliveProp = properties.getProperty(CassandraStoreParameters.KEEP_ALIVE); - String readTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.READ_TIMEOUT_MILLIS); - String receiveBufferSizeProp = properties.getProperty(CassandraStoreParameters.RECEIVER_BUFFER_SIZE); - String reuseAddress = properties.getProperty(CassandraStoreParameters.REUSE_ADDRESS); - String sendBufferSize = properties.getProperty(CassandraStoreParameters.SEND_BUFFER_SIZE); - String soLinger = properties.getProperty(CassandraStoreParameters.SO_LINGER); - String tcpNoDelay = properties.getProperty(CassandraStoreParameters.TCP_NODELAY); - SocketOptions options = new SocketOptions(); - if (connectionTimeoutMillisProp != null) { - options.setConnectTimeoutMillis(Integer.parseInt(connectionTimeoutMillisProp)); - } - if (keepAliveProp != null) { - options.setKeepAlive(Boolean.parseBoolean(keepAliveProp)); - } - if (readTimeoutMillisProp != null) { - options.setReadTimeoutMillis(Integer.parseInt(readTimeoutMillisProp)); - } - if (receiveBufferSizeProp != null) { - options.setReceiveBufferSize(Integer.parseInt(receiveBufferSizeProp)); - } - if (reuseAddress != null) { - options.setReuseAddress(Boolean.parseBoolean(reuseAddress)); - } - if (sendBufferSize != null) { - options.setSendBufferSize(Integer.parseInt(sendBufferSize)); - } - if (soLinger != null) { - options.setSoLinger(Integer.parseInt(soLinger)); - } - if (tcpNoDelay != null) { - options.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelay)); - } - return builder.withSocketOptions(options); - } - - private List<String> readCustomCodec(Properties properties) throws JDOMException, IOException { - String filename = properties.getProperty(CassandraStoreParameters.CUSTOM_CODEC_FILE); - if (filename != null) { - List<String> codecs = new ArrayList<>(); - SAXBuilder builder = new SAXBuilder(); - Document doc = builder.build(getClass().getClassLoader().getResourceAsStream(filename)); - List<Element> codecElementList = doc.getRootElement().getChildren("codec"); - for (Element codec : codecElementList) { - codecs.add(codec.getValue()); - } - return codecs; - } - return null; - } - - /** - * This method returns configured read consistency level. - * @return read Consistency level - */ - public String getReadConsistencyLevel() { - return readConsistencyLevel; - } - - /** - * This method returns configured write consistency level. - * @return write Consistency level - */ - public String getWriteConsistencyLevel() { - return writeConsistencyLevel; - } - - - public void close() { - this.session.close(); - this.cluster.close(); - } - - private void registerCustomCodecs(List<String> codecs) throws Exception { - for (String codec : codecs) { - this.cluster.getConfiguration().getCodecRegistry().register((TypeCodec<?>) Class.forName(codec).newInstance()); - } - } - -} http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java deleted file mode 100644 index 7d5ac05..0000000 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gora.cassandra.store; - -import org.apache.gora.cassandra.bean.CassandraKey; -import org.apache.gora.cassandra.bean.Field; -import org.apache.gora.cassandra.bean.KeySpace; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * This class represents the Cassandra Mapping. - */ -public class CassandraMapping { - - private static final String PRIMARY_KEY = "primarykey"; - private CassandraKey cassandraKey; - private Map<String, String> tableProperties; - private Class keyClass; - private Class persistentClass; - private KeySpace keySpace; - private List<Field> fieldList; - private Field inlinedDefinedPartitionKey; - private String coreName; - - /** - * Constructor of the class - */ - CassandraMapping() { - this.fieldList = new ArrayList<>(); - this.tableProperties = new HashMap<>(); - } - - /** - * This method returns the KeySpace in the mapping file, - * - * @return Key space {@link KeySpace} - */ - public KeySpace getKeySpace() { - return keySpace; - } - - /** - * This method set the KeySpace in the Cassandra mapping. - * - * @param keySpace Key space {@link KeySpace} - */ - void setKeySpace(KeySpace keySpace) { - this.keySpace = keySpace; - } - - /** - * Thi method returns only the fields which belongs only for the Persistent Object. - * - * @return List of Fields - */ - public List<Field> getFieldList() { - return fieldList; - } - - /** - * This method returns the Persistent Object's Field from the mapping, according to the FieldName. - * - * @param fieldName Field Name - * @return Field {@link Field} - */ - public Field getFieldFromFieldName(String fieldName) { - for (Field field1 : fieldList) { - if (field1.getFieldName().equalsIgnoreCase(fieldName)) { - return field1; - } - } - return null; - } - - /** - * This method returns the Persistent Object's Field from the mapping, according to the ColumnName. - * - * @param columnName Column Name - * @return Field {@link Field} - */ - public Field getFieldFromColumnName(String columnName) { - for (Field field1 : fieldList) { - if (field1.getColumnName().equalsIgnoreCase(columnName)) { - return field1; - } - } - return null; - } - - /** - * This method returns the Field Names - * - * @return array of Field Names - */ - public String[] getFieldNames() { - List<String> fieldNames = new ArrayList<>(fieldList.size()); - for (Field field : fieldList) { - fieldNames.add(field.getFieldName()); - } - String[] fieldNameArray = new String[fieldNames.size()]; - return fieldNames.toArray(fieldNameArray); - } - - /** - * This method returns partition keys - * - * @return partitionKeys - */ - public String[] getAllFieldsIncludingKeys() { - List<String> fieldNames = new ArrayList<>(fieldList.size()); - for (Field field : fieldList) { - fieldNames.add(field.getFieldName()); - } - if (cassandraKey != null) { - for (Field field : cassandraKey.getFieldList()) { - fieldNames.add(field.getFieldName()); - } - } - String[] fieldNameArray = new String[fieldNames.size()]; - return fieldNames.toArray(fieldNameArray); - } - - /** - * This method return all the fields which involves with partition keys, Including composite Keys - * @return field Names - */ - public String[] getAllKeys() { - List<String> fieldNames = new ArrayList<>(); - Field keyField = getInlinedDefinedPartitionKey(); - if (cassandraKey != null) { - for (Field field : cassandraKey.getFieldList()) { - fieldNames.add(field.getFieldName()); - } - } else { - fieldNames.add(keyField.getFieldName()); - } - String[] fieldNameArray = new String[fieldNames.size()]; - return fieldNames.toArray(fieldNameArray); - } - - public CassandraKey getCassandraKey() { - return cassandraKey; - } - - void setCassandraKey(CassandraKey cassandraKey) { - this.cassandraKey = cassandraKey; - } - - public String getCoreName() { - return coreName; - } - - void setCoreName(String coreName) { - this.coreName = coreName; - } - - void addCassandraField(Field field) { - this.fieldList.add(field); - } - - void addProperty(String key, String value) { - this.tableProperties.put(key, value); - } - - public String getProperty(String key) { - return this.tableProperties.get(key); - } - - private Field getDefaultCassandraKey() { - Field field = new Field(); - field.setFieldName("defaultId"); - field.setColumnName("defaultId"); - field.setType("varchar"); - field.addProperty("primarykey", "true"); - return field; - } - - public Class getKeyClass() { - return keyClass; - } - - public void setKeyClass(Class keyClass) { - this.keyClass = keyClass; - } - - public Class getPersistentClass() { - return persistentClass; - } - - void setPersistentClass(Class persistentClass) { - this.persistentClass = persistentClass; - } - - /** - * This method return the Inlined defined Partition Key, - * If there isn't any inlined define partition keys, - * this method returns default predefined partition key "defaultId". - * - * @return Partition Key {@link Field} - */ - public Field getInlinedDefinedPartitionKey() { - if (inlinedDefinedPartitionKey != null) { - return inlinedDefinedPartitionKey; - } else { - for (Field field : fieldList) { - if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) { - inlinedDefinedPartitionKey = field; - break; - } - } - if (inlinedDefinedPartitionKey == null) { - return getDefaultCassandraKey(); - } - return inlinedDefinedPartitionKey; - } - } - - void finalized() { - Field field = getInlinedDefinedPartitionKey(); - if (!fieldList.contains(field) && cassandraKey == null) { - fieldList.add(field); - } - } -}
