http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java new file mode 100644 index 0000000..a5ada56 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.store; + +import org.apache.gora.cassandra.bean.CassandraKey; +import org.apache.gora.cassandra.bean.ClusterKeyField; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.bean.KeySpace; +import org.apache.gora.cassandra.bean.PartitionKeyField; +import org.apache.gora.persistency.Persistent; +import org.jdom.Attribute; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.input.SAXBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +/** + * This Class reads the Cassandra Mapping file and create tha Cassandra Mapping object. + * {@link org.apache.gora.cassandra.store.CassandraMapping} + */ +public class CassandraMappingBuilder<K, T extends Persistent> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraMappingBuilder.class); + + private CassandraStore dataStore; + + public CassandraMappingBuilder() { + } + + /** + * Constructor for builder to create the mapper. + * + * @param store Cassandra Store + */ + CassandraMappingBuilder(final CassandraStore<K, T> store) { + this.dataStore = store; + } + + private static int getReplicationFactor(Element element) { + if (element == null) { + return 1; + } + String value = element.getAttributeValue("replicationFactor"); + if (value == null) { + return 1; + } else { + return Integer.parseInt(value); + } + } + + /** + * @param fileName mapping fileName + * @return All the Cassandra Mappings in the mapping file + * @throws Exception + */ + @SuppressWarnings("all") + public List<CassandraMapping> readMappingFile(File fileName) throws Exception { + List<CassandraMapping> mappings = new ArrayList<>(); + SAXBuilder builder = new SAXBuilder(); + Document doc = builder.build(fileName); + + List<Element> keyspaces = doc.getRootElement().getChildren("keyspace"); + List<Element> classes = doc.getRootElement().getChildren("class"); + List<Element> keys = doc.getRootElement().getChildren("cassandraKey"); + for (Element classElement : classes) { + CassandraMapping mapping = new CassandraMapping(); + processClass(mapping, classElement); + mappings.add(mapping); + } + + for (CassandraMapping mapping : mappings) { + for (Element keySpace : keyspaces) { + String keySpaceName = keySpace.getAttributeValue("name"); + if (keySpaceName.equals(mapping.getProperty("keyspace"))) { + processKeySpace(mapping, keySpace, keySpaceName); + break; + } + } + + for (Element cassandraKey : keys) { + String cassandraKeyName = cassandraKey.getAttributeValue("name"); + if (mapping.getProperty("keyClass").equals(cassandraKeyName)) { + processCassandraKeys(mapping, cassandraKey, cassandraKeyName); + } + } + mapping.finalized(); + } + return mappings; + } + + /** + * In this method we reads the mapping file and creates the Cassandra Mapping. + * + * @param filename mapping file name + * @return @{@link CassandraMapping} + * @throws IOException + */ + @SuppressWarnings("all") + CassandraMapping readMapping(String filename) throws Exception { + CassandraMapping cassandraMapping = new CassandraMapping(); + Class keyClass = dataStore.getKeyClass(); + Class persistentClass = dataStore.getPersistentClass(); + SAXBuilder builder = new SAXBuilder(); + Document doc = builder.build(getClass().getClassLoader().getResourceAsStream(filename)); + + List<Element> keyspaces = doc.getRootElement().getChildren("keyspace"); + List<Element> classes = doc.getRootElement().getChildren("class"); + List<Element> keys = doc.getRootElement().getChildren("cassandraKey"); + + boolean classMatched = false; + for (Element classElement : classes) { + if (classElement.getAttributeValue("keyClass").equals( + keyClass.getCanonicalName()) + && classElement.getAttributeValue("name").equals( + persistentClass.getCanonicalName())) { + + classMatched = true; + processClass(cassandraMapping, classElement); + cassandraMapping.setKeyClass(dataStore.getKeyClass()); + cassandraMapping.setPersistentClass(dataStore.getPersistentClass()); + break; + } + LOG.warn("Check that 'keyClass' and 'name' parameters in gora-solr-mapping.xml " + + "match with intended values. A mapping mismatch has been found therefore " + + "no mapping has been initialized for class mapping at position " + + " {} in mapping file.", classes.indexOf(classElement)); + } + if (!classMatched) { + throw new RuntimeException("Check that 'keyClass' and 'name' parameters in " + filename + " no mapping has been initialized for " + persistentClass + "class mapping"); + } + + String keyspaceName = cassandraMapping.getProperty("keyspace"); + if (keyspaceName != null) { + KeySpace keyspace; + for (Element keyspaceElement : keyspaces) { + if (keyspaceName.equals(keyspaceElement.getAttributeValue("name"))) { + processKeySpace(cassandraMapping, keyspaceElement, keyspaceName); + break; + } + } + } else { + throw new RuntimeException("Couldn't find KeySpace in the Cassandra mapping. Please configure the cassandra mapping correctly."); + } + for (Element key : keys) { + if (keyClass.getName().equals(key.getAttributeValue("name"))) { + processCassandraKeys(cassandraMapping, key, keyClass.getName()); + break; + } + } + cassandraMapping.finalized(); + return cassandraMapping; + } + + private void processClass(CassandraMapping cassandraMapping, Element classElement) { + String tableName = classElement.getAttributeValue("table"); + cassandraMapping.setCoreName(tableName); + + List classAttributes = classElement.getAttributes(); + for (Object anAttributeList : classAttributes) { + Attribute attribute = (Attribute) anAttributeList; + String attributeName = attribute.getName(); + String attributeValue = attribute.getValue(); + cassandraMapping.addProperty(attributeName, attributeValue); + } + + List<Element> fields = classElement.getChildren("field"); + + for (Element field : fields) { + Field cassandraField = new Field(); + + List fieldAttributes = field.getAttributes(); + processAttributes(fieldAttributes, cassandraField); + cassandraMapping.addCassandraField(cassandraField); + } + } + + private void processKeySpace(CassandraMapping cassandraMapping, Element keyspaceElement, String keyspaceName) { + KeySpace keyspace = new KeySpace(); + List fieldAttributes = keyspaceElement.getAttributes(); + for (Object attributeObject : fieldAttributes) { + Attribute attribute = (Attribute) attributeObject; + String attributeName = attribute.getName(); + String attributeValue = attribute.getValue(); + switch (attributeName) { + case "name": + keyspace.setName(attributeValue); + break; + case "durableWrite": + keyspace.setDurableWritesEnabled(Boolean.parseBoolean(attributeValue)); + break; + default: + LOG.warn("{} attribute is Unsupported or Invalid, in {} Cassandra KeySpace. Please configure the cassandra mapping correctly.", new Object[]{attributeName, keyspaceName}); + break; + } + } + Element placementStrategy = keyspaceElement.getChild("placementStrategy"); + if (placementStrategy != null) { + switch (KeySpace.PlacementStrategy.valueOf(placementStrategy.getAttributeValue("name"))) { + case SimpleStrategy: + keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.SimpleStrategy); + keyspace.setReplicationFactor(getReplicationFactor(placementStrategy)); + break; + case NetworkTopologyStrategy: + List<Element> dataCenters = placementStrategy.getChildren("datacenter"); + keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.NetworkTopologyStrategy); + for (Element dataCenter : dataCenters) { + String dataCenterName = dataCenter.getAttributeValue("name"); + keyspace.addDataCenter(dataCenterName, getReplicationFactor(dataCenter)); + } + break; + } + } else { + keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.SimpleStrategy); + keyspace.setReplicationFactor(1); + } + cassandraMapping.setKeySpace(keyspace); + } + + private void processCassandraKeys(CassandraMapping cassandraMapping, Element key, String keyName) { + CassandraKey cassandraKey = new CassandraKey(keyName); + Element partitionKeys = key.getChild("partitionKey"); + Element clusterKeys = key.getChild("clusterKey"); + List<Element> partitionKeyFields = partitionKeys.getChildren("field"); + List<Element> partitionCompositeKeyFields = partitionKeys.getChildren("compositeKey"); + // process non composite partition keys + for (Element partitionKeyField : partitionKeyFields) { + PartitionKeyField fieldKey = new PartitionKeyField(); + List fieldAttributes = partitionKeyField.getAttributes(); + processAttributes(fieldAttributes, fieldKey); + cassandraKey.addPartitionKeyField(fieldKey); + } + // process composite partitions keys + for (Element partitionCompositeKeyField : partitionCompositeKeyFields) { + PartitionKeyField compositeFieldKey = new PartitionKeyField(); + compositeFieldKey.setComposite(true); + List<Element> compositeKeyFields = partitionCompositeKeyField.getChildren("field"); + for (Element partitionKeyField : compositeKeyFields) { + PartitionKeyField fieldKey = new PartitionKeyField(); + List fieldAttributes = partitionKeyField.getAttributes(); + processAttributes(fieldAttributes, fieldKey); + compositeFieldKey.addField(fieldKey); + } + cassandraKey.addPartitionKeyField(compositeFieldKey); + } + + //process cluster keys + List<Element> clusterKeyFields = clusterKeys.getChildren("key"); + for (Element clusterKeyField : clusterKeyFields) { + ClusterKeyField keyField = new ClusterKeyField(); + List fieldAttributes = clusterKeyField.getAttributes(); + for (Object anAttributeList : fieldAttributes) { + Attribute attribute = (Attribute) anAttributeList; + String attributeName = attribute.getName(); + String attributeValue = attribute.getValue(); + switch (attributeName) { + case "column": + keyField.setColumnName(attributeValue); + break; + case "order": + keyField.setOrder(ClusterKeyField.Order.valueOf(attributeValue.toUpperCase(Locale.ENGLISH))); + break; + default: + LOG.warn("{} attribute is Unsupported or Invalid, in {} Cassandra Key. Please configure the cassandra mapping correctly.", new Object[]{attributeName, keyName}); + break; + } + } + cassandraKey.addClusterKeyField(keyField); + } + cassandraMapping.setCassandraKey(cassandraKey); + } + + private void processAttributes(List<Element> attributes, Field fieldKey) { + for (Object anAttributeList : attributes) { + Attribute attribute = (Attribute) anAttributeList; + String attributeName = attribute.getName(); + String attributeValue = attribute.getValue(); + switch (attributeName) { + case "name": + fieldKey.setFieldName(attributeValue); + break; + case "column": + fieldKey.setColumnName(attributeValue); + break; + case "type": + // replace UDT into frozen + if (attributeValue.contains("udt(")) { + attributeValue = attributeValue.replace("udt(", "frozen("); + } + fieldKey.setType(attributeValue.replace("(", "<").replace(")", ">")); + if (fieldKey.getType().equalsIgnoreCase("udt")) { + throw new RuntimeException("Invalid udt type, Please enter dataType for udt with a unique name for particular user define data type, like udt(metadata)."); + } + break; + default: + fieldKey.addProperty(attributeName, attributeValue); + break; + } + } + } +}
http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java new file mode 100644 index 0000000..129c49f --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.store; + +import org.apache.gora.cassandra.query.CassandraQuery; +import org.apache.gora.cassandra.serializers.CassandraSerializer; +import org.apache.gora.persistency.BeanFactory; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.BeanFactoryImpl; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.ws.impl.PartitionWSQueryImpl; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Implementation of Cassandra Store. + * + * @param <K> key class + * @param <T> persistent class + */ +public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> { + + private static final String DEFAULT_MAPPING_FILE = "gora-cassandra-mapping.xml"; + + private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class); + + private BeanFactory<K, T> beanFactory; + + private Class<K> keyClass; + + private Class<T> persistentClass; + + private CassandraMapping mapping; + + private CassandraSerializer cassandraSerializer; + + public CassandraStore() { + super(); + } + + /** + * In initializing the cassandra datastore, read the mapping file, creates the basic connection to cassandra cluster, + * according to the gora properties + * + * @param keyClass key class + * @param persistentClass persistent class + * @param properties properties + */ + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + LOG.debug("Initializing Cassandra store"); + String serializationType; + try { + this.keyClass = keyClass; + this.persistentClass = persistentClass; + if (this.beanFactory == null) { + this.beanFactory = new BeanFactoryImpl<>(keyClass, persistentClass); + } + String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); + serializationType = properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE); + CassandraMappingBuilder mappingBuilder = new CassandraMappingBuilder(this); + mapping = mappingBuilder.readMapping(mappingFile); + CassandraClient cassandraClient = new CassandraClient(); + cassandraClient.initialize(properties, mapping); + cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, serializationType, this, mapping); + } catch (Exception e) { + throw new RuntimeException("Error while initializing Cassandra store: " + e.getMessage(), e); + } + } + + /** + * {@inheritDoc} + * + * @return + */ + @SuppressWarnings("all") + @Override + public Class<T> getPersistentClass() { + return (Class<T>) this.persistentClass; + } + + /** + * {@inheritDoc} + * <p> + * This is a setter method to set the class of persistent objects. + * + * @param persistentClass class of persistent objects + * {@link org.apache.gora.persistency.Persistent} + */ + @Override + public void setPersistentClass(Class<T> persistentClass) { + this.persistentClass = persistentClass; + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public String getSchemaName() { + return mapping.getCoreName(); + } + + /** + * {@inheritDoc} + */ + @Override + public void createSchema() { + cassandraSerializer.createSchema(); + } + + /** + * {@inheritDoc} + */ + @Override + public void deleteSchema() { + cassandraSerializer.deleteSchema(); + } + + /** + * {@inheritDoc} + * + * @return + */ + @SuppressWarnings("all") + @Override + public Class<K> getKeyClass() { + return this.keyClass; + } + + /** + * {@inheritDoc} + * + * @param keyClass the class of keys + */ + @Override + public void setKeyClass(Class<K> keyClass) { + this.keyClass = keyClass; + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public K newKey() { + try { + if (beanFactory != null) { + return beanFactory.newKey(); + } else { + return keyClass.newInstance(); + } + } catch (Exception ex) { + throw new RuntimeException("Error while instantiating a key: " + ex.getMessage(), ex); + } + } + + /** + * {@inheritDoc} + * + * @return + */ + @SuppressWarnings("all") + @Override + public T newPersistent() { + try { + if (beanFactory != null) { + return this.beanFactory.newPersistent(); + } else { + return persistentClass.newInstance(); + } + } catch (Exception ex) { + throw new RuntimeException("Error while instantiating a persistent: " + ex.getMessage(), ex); + } + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public BeanFactory<K, T> getBeanFactory() { + return this.beanFactory; + } + + /** + * {@inheritDoc} + * + * @param beanFactory the BeanFactory to use + */ + @Override + public void setBeanFactory(BeanFactory<K, T> beanFactory) { + this.beanFactory = beanFactory; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + this.cassandraSerializer.close(); + } + + /** + * {@inheritDoc} + * + * @param key the key of the object + * @return + */ + @Override + public T get(K key) { + return (T) cassandraSerializer.get(key); + } + + /** + * {@inheritDoc} + * + * @param key the key of the object + * @param fields the fields required in the object. Pass null, to retrieve all fields + * @return + */ + @Override + public T get(K key, String[] fields) { + return (T) cassandraSerializer.get(key, fields); + } + + /** + * {@inheritDoc} + * + * @param key key value + * @param obj object value + */ + @Override + public void put(K key, T obj) { + cassandraSerializer.put(key, obj); + } + + /** + * {@inheritDoc} + * + * @param key the key of the object + * @return + */ + @Override + public boolean delete(K key) { + return cassandraSerializer.delete(key); + } + + /** + * {@inheritDoc} + * + * @param query matching records to this query will be deleted + * @return + */ + @Override + public long deleteByQuery(Query<K, T> query) { + return cassandraSerializer.deleteByQuery(query); + } + + /** + * {@inheritDoc} + * + * @param query the query to execute. + * @return + */ + @Override + public Result<K, T> execute(Query<K, T> query) { + return (Result<K, T>) cassandraSerializer.execute(this, query); + } + + /** + * This method is used to update multiple objects in the table. + * + * @param query Query + * @return isQuery applied or not + */ + public boolean updateByQuery(Query<K, T> query) { + return cassandraSerializer.updateByQuery(query); + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public Query<K, T> newQuery() { + Query<K, T> query = new CassandraQuery(this); + return query; + } + + /** + * {@inheritDoc} + * + * @param query cassandra Query + * @return + * @throws IOException + */ + @Override + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { + List<PartitionQuery<K, T>> partitions = new ArrayList<>(); + PartitionWSQueryImpl<K, T> pqi = new PartitionWSQueryImpl<>(query); + pqi.setDataStore(this); + partitions.add(pqi); + return partitions; + } + + /** + * {@inheritDoc} + */ + @Override + public void flush() { + // ignore since caching has been disabled + } + + /** + * {@inheritDoc} + * + * @param obj + * @return + */ + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + /** + * {@inheritDoc} + */ + @Override + public void truncateSchema() { + cassandraSerializer.truncateSchema(); + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public boolean schemaExists() { + return cassandraSerializer.schemaExists(); + } + + public enum SerializerType { + AVRO("AVRO"), NATIVE("NATIVE"); + String val; + + SerializerType(String v) { + this.val = v; + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java new file mode 100644 index 0000000..7032dab --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.store; + +/** + * Configuration Properties. + */ +public class CassandraStoreParameters { + + /** + * Property pointing to cassandra db contact points. + * string (multiple values with comma separated) + */ + public static final String CASSANDRA_SERVERS = "gora.cassandrastore.cassandraServers"; + /** + * Property pointing to the Cassandra keyspace. + * string + */ + public static final String KEYSPACE = "gora.cassandrastore.keyspace"; + /** + * Property pointing to the port to use to connect to the Cassandra hosts. + * integer + */ + public static final String PORT = "gora.cassandrastore.port"; + + /** + * Property pointing to the Cassandra cluster name. + * string + */ + public static final String CLUSTER_NAME = "gora.cassandrastore.clusterName"; + /** + * Property pointing to set compression to use for the transport. + * "LZ4", "SNAPPY", "NONE" + */ + public static final String COMPRESSION = "gora.cassandrastore.compression"; + /** + * Property pointing to the username to connect to the server. + * string + */ + public static final String USERNAME = "gora.cassandrastore.username"; + /** + * Property pointing to the password to connect to the server. + * string + */ + public static final String PASSWORD = "gora.cassandrastore.password"; + /** + * Property pointing to set load balancing policy. + * "RoundRobinPolicy", "LatencyAwareRoundRobinPolicy", "TokenAwareRoundRobinPolicy" + */ + public static final String LOAD_BALANCING_POLICY = "gora.cassandrastore.loadBalancingPolicy"; + /** + * Property pointing to enable/disable JMX reporting. + * boolean + */ + public static final String ENABLE_JMX_REPORTING = "gora.cassandrastore.enableJMXReporting"; + /** + * Property pointing to enable/disable metrics. + * boolean + */ + public static final String ENABLE_METRICS = "gora.cassandrastore.enableMetrics"; + /** + * Property pointing to set local host core connections size. + * integer + */ + public static final String LOCAL_CORE_CONNECTIONS_PER_HOST = "gora.cassandrastore.localCoreConnectionsPerHost"; + /** + * Property pointing to set remote host core connections size. + * integer + */ + public static final String REMOTE_CORE_CONNECTIONS_PER_HOST = "gora.cassandrastore.remoteCoreConnectionsPerHost"; + /** + * Property pointing to set local host max connections size. + * integer + */ + public static final String LOCAL_MAX_CONNECTIONS_PER_HOST = "gora.cassandrastore.localMaxConnectionsPerHost"; + /** + * Property pointing to set remote host max connections size. + * integer + */ + public static final String REMOTE_MAX_CONNECTIONS_PER_HOST = "gora.cassandrastore.remoteMaxConnectionsPerHost"; + /** + * Property pointing to set local host new connection threshold. + * integer + */ + public static final String LOCAL_NEW_CONNECTION_THRESHOLD = "gora.cassandrastore.localNewConnectionThreshold"; + /** + * Property pointing to set remote host new connection threshold. + * integer + */ + public static final String REMOTE_NEW_CONNECTION_THRESHOLD = "gora.cassandrastore.remoteNewConnectionThreshold"; + /** + * Property pointing to set local host max requests per connection. + * integer + */ + public static final String LOCAL_MAX_REQUESTS_PER_CONNECTION = "gora.cassandrastore.localMaxRequestsPerConnection"; + /** + * Property pointing to set remote host max requests per connection. + * integer + */ + public static final String REMOTE_MAX_REQUESTS_PER_CONNECTION = "gora.cassandrastore.remoteMaxRequestsPerConnection"; + /** + * Property pointing to set CQL Protocol version. + * integer + */ + public static final String PROTOCOL_VERSION = "gora.cassandrastore.protocolVersion"; + /** + * Property pointing to set consistency level in Cassandra Query Options. + * "ALL", "ANY", "EACH_QUORUM", "LOCAL_ONE", "LOCAL_QUORUM", "LOCAL_SERIAL", "ONE", "QUORUM", "SERIAL", "THREE", "TWO" + */ + public static final String CONSISTENCY_LEVEL = "gora.cassandrastore.consistencyLevel"; + /** + * Property pointing to set fetch size in Cassandra Query Options. + * integer + */ + public static final String FETCH_SIZE = "fetchSize"; + /** + * Property pointing to set serial consistency level in Cassandra Query Options. + * "ALL", "ANY", "EACH_QUORUM", "LOCAL_ONE", "LOCAL_QUORUM", "LOCAL_SERIAL", "ONE", "QUORUM", "SERIAL", "THREE", "TWO" + */ + public static final String SERIAL_CONSISTENCY_LEVEL = "gora.cassandrastore.serialConsistencyLevel"; + /** + * Property pointing to set reconnection policy + * "ConstantReconnectionPolicy", "ExponentialReconnectionPolicy", + */ + public static final String RECONNECTION_POLICY = "gora.cassandrastore.reconnectionPolicy"; + /** + * Property pointing to set the delay in constant reconnection policy. + * long + */ + public static final String CONSTANT_RECONNECTION_POLICY_DELAY = "gora.cassandrastore.constantReconnectionPolicyDelay"; + /** + * Property pointing to set the delay in exponential reconnection policy. + * long + */ + public static final String EXPONENTIAL_RECONNECTION_POLICY_BASE_DELAY = "gora.cassandrastore.exponentialReconnectionPolicyBaseDelay"; + /** + * Property pointing to set the max delay in exponential reconnection policy. + * long + */ + public static final String EXPONENTIAL_RECONNECTION_POLICY_MAX_DELAY = "gora.cassandrastore.exponentialReconnectionPolicyMaxDelay"; + /** + * Property pointing to set the retry policy. + * "DefaultRetryPolicy", "DowngradingConsistencyRetryPolicy", "FallthroughRetryPolicy", + * "LoggingDefaultRetryPolicy", "LoggingDowngradingConsistencyRetryPolicy", "LoggingFallthroughRetryPolicy" + */ + public static final String RETRY_POLICY = "gora.cassandrastore.retryPolicy"; + /** + * Property pointing to set the connection time out in Cassandra Socket Options. + * integer + */ + public static final String CONNECTION_TIMEOUT_MILLIS = "gora.cassandrastore.connectionTimeoutMillis"; + /** + * Property pointing to set the keep alive in Cassandra Socket Options. + * boolean + */ + public static final String KEEP_ALIVE = "gora.cassandrastore.keepAlive"; + /** + * Property pointing to set the read time out in Cassandra Socket Options. + * integer + */ + public static final String READ_TIMEOUT_MILLIS = "gora.cassandrastore.readTimeoutMillis"; + /** + * Property pointing to set the receiver buffer size in Cassandra Socket Options. + * integer + */ + public static final String RECEIVER_BUFFER_SIZE = "gora.cassandrastore.receiverBufferSize"; + /** + * Property pointing to set the reuse address in Cassandra Socket Options. + * boolean + */ + public static final String REUSE_ADDRESS = "gora.cassandrastore.reuseAddress"; + /** + * Property pointing to set the sender buffer size in Cassandra Socket Options. + * integer + */ + public static final String SEND_BUFFER_SIZE = "gora.cassandrastore.sendBufferSize"; + /** + * Property pointing to set the soLinger in Cassandra Socket Options. + * integer + */ + public static final String SO_LINGER = "gora.cassandrastore.soLinger"; + /** + * Property pointing to set the no tcp delay in Cassandra Socket Options. + * boolean + */ + public static final String TCP_NODELAY = "gora.cassandrastore.tcpNoDelay"; + /** + * Property pointing to enable SSL. + * boolean + */ + public static final String ENABLE_SSL = "gora.cassandrastore.enableSSL"; + /** + * Property pointing to set aware local data center. + * string + */ + public static final String DATA_CENTER = "gora.cassandrastore.dataCenter"; + /** + * Property pointing to enable/disable remote data centers for local consistency level. + * string + */ + public static final String ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL = "gora.cassandrastore.allowRemoteDCsForLocalConsistencyLevel"; + /** + * Property pointing to use Native Cassandra Native Serialization. + * avro/ native + */ + public static final String CASSANDRA_SERIALIZATION_TYPE = "gora.cassandrastore.cassandraSerializationType"; + /** + * Property pointing to the custom codec file. + * string + */ + public static final String CUSTOM_CODEC_FILE = "gora.cassandrastore.custom.codec.file"; + /** + * Property pointing to set consistency level for read queries + * "ALL", "ANY", "EACH_QUORUM", "LOCAL_ONE", "LOCAL_QUORUM", "LOCAL_SERIAL", "ONE", "QUORUM", "SERIAL", "THREE", "TWO" + */ + public static final String READ_CONSISTENCY_LEVEL = "gora.cassandrastore.read.consistencyLevel"; + /** + * Property pointing to set consistency level for write queries + * "ALL", "ANY", "EACH_QUORUM", "LOCAL_ONE", "LOCAL_QUORUM", "LOCAL_SERIAL", "ONE", "QUORUM", "SERIAL", "THREE", "TWO" + */ + public static final String WRITE_CONSISTENCY_LEVEL = "gora.cassandrastore.write.consistencyLevel"; +} http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/package-info.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/package-info.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/package-info.java new file mode 100644 index 0000000..2cd9003 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains all the Cassandra store related classes. + */ +package org.apache.gora.cassandra.store; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/test/conf/avro/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/test/conf/avro/gora-cassandra-mapping.xml b/gora-cassandra/src/test/conf/avro/gora-cassandra-mapping.xml new file mode 100644 index 0000000..9eb71b2 --- /dev/null +++ b/gora-cassandra/src/test/conf/avro/gora-cassandra-mapping.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<gora-otd> + + <keyspace name="avroKeySpace" durableWrite="false"> + <placementStrategy name="SimpleStrategy" replicationFactor="1"/> + </keyspace> + + <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage" + allowFiltering="true" + keyspace="avroKeySpace"> + <field name="url" column="url" type="ascii"/> + <field name="content" column="content" type="blob"/> + <field name="parsedContent" column="parsedContent" type="list(ascii)"/> + <field name="outlinks" column="outlinks" type="map(text,text)"/> + <field name="headers" column="headers" type="map(text,text)"/> + <field name="byteData" column="byteData" type="map(text,blob)"/> + <field name="metadata" column="metadata" type="blob"/> + <field name="stringData" column="stringData" type="map(text,ascii)"/> + </class> + + <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" keyspace="avroKeySpace" + allowFiltering="true" + table="Employee" compactStorage="true"> + <field name="name" column="name" type="text"/> + <field name="dateOfBirth" column="dob" type="bigint"/> + <field name="ssn" column="ssn" type="text"/> + <field name="salary" column="salary" type="int"/> + <field name="boss" column="boss" type="blob"/> + <field name="webpage" column="webpage" type="blob"/> + </class> + +</gora-otd> http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/test/conf/avroUDT/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/test/conf/avroUDT/gora-cassandra-mapping.xml b/gora-cassandra/src/test/conf/avroUDT/gora-cassandra-mapping.xml new file mode 100644 index 0000000..3c71182 --- /dev/null +++ b/gora-cassandra/src/test/conf/avroUDT/gora-cassandra-mapping.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<gora-otd> + + <keyspace name="avroKeySpace" durableWrite="false"> + <placementStrategy name="SimpleStrategy" replicationFactor="1"/> + </keyspace> + + <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage" + allowFiltering="true" + keyspace="avroKeySpace"> + <field name="url" column="url" type="ascii"/> + <field name="content" column="content" type="blob"/> + <field name="parsedContent" column="parsedContent" type="list(ascii)"/> + <field name="outlinks" column="outlinks" type="map(text,text)"/> + <field name="headers" column="headers" type="map(text,text)"/> + <field name="byteData" column="byteData" type="map(text,blob)"/> + <field name="metadata" column="metadata" type="udt(metadata)"/> + <field name="stringData" column="stringData" type="map(text,ascii)"/> + </class> + + <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" keyspace="avroKeySpace" + allowFiltering="true" + table="Employee" compactStorage="true"> + <field name="name" column="name" type="text"/> + <field name="dateOfBirth" column="dob" type="bigint"/> + <field name="ssn" column="ssn" type="text"/> + <field name="salary" column="salary" type="int"/> + <field name="boss" column="boss" type="blob"/> + <field name="webpage" column="webpage" type="blob"/> + </class> + +</gora-otd>