http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java deleted file mode 100644 index c5db72b..0000000 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gora.cassandra.serializers; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericArray; -import org.apache.avro.specific.SpecificFixed; -import org.apache.avro.util.Utf8; -import org.apache.gora.persistency.Persistent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utility class that infers the concrete Serializer needed to turn a value into - * its binary representation - */ -public class TypeUtils { - - public static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class); - - // @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Class<? extends Object> getClass(Object value) { - return value.getClass(); - } - - public static Schema getSchema(Object value) { - if (value instanceof GenericArray) { - return Schema.createArray( getElementSchema((GenericArray<?>)value) ); - } else { - return getSchema( getClass(value) ); - } - } - - public static Type getType(Object value) { - return getType( getClass(value) ); - } - - public static Type getType(Class<?> clazz) { - if (clazz.equals(Utf8.class)) { - return Type.STRING; - } else if (clazz.equals(Boolean.class) || clazz.equals(boolean.class)) { - return Type.BOOLEAN; - } else if (clazz.equals(ByteBuffer.class)) { - return Type.BYTES; - } else if (clazz.equals(Double.class) || clazz.equals(double.class)) { - return Type.DOUBLE; - } else if (clazz.equals(Float.class) || clazz.equals(float.class)) { - return Type.FLOAT; - } else if (clazz.equals(Integer.class) || clazz.equals(int.class)) { - return Type.INT; - } else if (clazz.equals(Long.class) || clazz.equals(long.class)) { - return Type.LONG; - } else if (clazz.isAssignableFrom(List.class)) { - return Type.ARRAY; - } else if (clazz.isAssignableFrom(Map.class)) { - return Type.MAP; - } else if (clazz.equals(Persistent.class)) { - return Type.RECORD; - } else if (clazz.getSuperclass().equals(SpecificFixed.class)) { - return Type.FIXED; - } else { - return null; - } - } - - public static Class<?> getClass(Type type) { - if (type == Type.STRING) { - return Utf8.class; - } else if (type == Type.BOOLEAN) { - return Boolean.class; - } else if (type == Type.BYTES) { - return ByteBuffer.class; - } else if (type == Type.DOUBLE) { - return Double.class; - } else if (type == Type.FLOAT) { - return Float.class; - } else if (type == Type.INT) { - return Integer.class; - } else if (type == Type.LONG) { - return Long.class; - } else if (type == Type.ARRAY) { - return List.class; - } else if (type == Type.MAP) { - return Map.class; - } else if (type == Type.RECORD) { - return Persistent.class; - } else if (type == Type.FIXED) { - // return SpecificFixed.class; - return null; - } else { - return null; - } - } - - public static Schema getSchema(Class<?> clazz) { - Type type = getType(clazz); - if (type == null) { - return null; - } else if (type == Type.FIXED) { - int size = getFixedSize(clazz); - String name = clazz.getName(); - String space = null; - int n = name.lastIndexOf("."); - if (n < 0) { - space = name.substring(0,n); - name = name.substring(n+1); - } else { - space = null; - } - String doc = null; // ? - // LOG.info(Schema.createFixed(name, doc, space, size).toString()); - return Schema.createFixed(name, doc, space, size); - } else if (type == Type.ARRAY) { - Object obj = null; - try { - obj = clazz.newInstance(); - } catch (InstantiationException e) { - LOG.warn(e.toString()); - return null; - } catch (IllegalAccessException e) { - LOG.warn(e.toString()); - return null; - } - return getSchema(obj); - } else if (type == Type.MAP) { - // TODO - // return Schema.createMap(...); - return null; - } else if (type == Type.RECORD) { - // TODO - // return Schema.createRecord(...); - return null; - } else { - return Schema.create(type); - } - } - - public static Class<?> getClass(Schema schema) { - Type type = schema.getType(); - if (type == null) { - return null; - } else if (type == Type.FIXED) { - try { - return Class.forName( schema.getFullName() ); - } catch (ClassNotFoundException e) { - LOG.warn(e.toString() + " : " + schema); - return null; - } - } else { - return getClass(type); - } - } - - public static int getFixedSize(Type type) { - if (type == Type.BOOLEAN) { - return 1; - } else if (type == Type.DOUBLE) { - return 8; - } else if (type == Type.FLOAT) { - return 4; - } else if (type == Type.INT) { - return 4; - } else if (type == Type.LONG) { - return 8; - } else { - return -1; - } - } - - public static int getFixedSize(Schema schema) { - Type type = schema.getType(); - if (type == Type.FIXED) { - return schema.getFixedSize(); - } else { - return getFixedSize(type); - } - } - - public static int getFixedSize(Class<?> clazz) { - Type type = getType(clazz); - if (type == Type.FIXED) { - try { - return ((SpecificFixed)clazz.newInstance()).bytes().length; - } catch (InstantiationException e) { - LOG.warn(e.toString()); - return -1; - } catch (IllegalAccessException e) { - LOG.warn(e.toString()); - return -1; - } - } else { - return getFixedSize(type); - } - } - - public static Schema getElementSchema(GenericArray<?> array) { - Schema schema = array.getSchema(); - return (schema.getType() == Type.ARRAY) ? schema.getElementType() : schema; - } - - /* - public static Schema getValueSchema(StatefulHashMap map) { - return map.getSchema().getValueType(); - } - - public static Type getValueType(StatefulHashMap map) { - return getValueSchema(map).getType(); - } - */ - -}
http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java deleted file mode 100644 index 5d22d94..0000000 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package contains Cassandra store related util classes for serializer. - */ -package org.apache.gora.cassandra.serializers; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java deleted file mode 100644 index 1f9d614..0000000 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java +++ /dev/null @@ -1,658 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gora.cassandra.store; - -import static org.apache.gora.cassandra.store.CassandraStore.colFamConsLvl; -import static org.apache.gora.cassandra.store.CassandraStore.readOpConsLvl; -import static org.apache.gora.cassandra.store.CassandraStore.writeOpConsLvl; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel; -import me.prettyprint.cassandra.serializers.ByteBufferSerializer; -import me.prettyprint.cassandra.serializers.IntegerSerializer; -import me.prettyprint.cassandra.serializers.StringSerializer; -import me.prettyprint.cassandra.service.CassandraHostConfigurator; -import me.prettyprint.hector.api.Cluster; -import me.prettyprint.hector.api.Keyspace; -import me.prettyprint.hector.api.beans.OrderedRows; -import me.prettyprint.hector.api.beans.OrderedSuperRows; -import me.prettyprint.hector.api.beans.Row; -import me.prettyprint.hector.api.beans.SuperRow; -import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition; -import me.prettyprint.hector.api.ddl.ComparatorType; -import me.prettyprint.hector.api.ddl.KeyspaceDefinition; -import me.prettyprint.hector.api.factory.HFactory; -import me.prettyprint.hector.api.mutation.Mutator; -import me.prettyprint.hector.api.query.QueryResult; -import me.prettyprint.hector.api.query.RangeSlicesQuery; -import me.prettyprint.hector.api.query.RangeSuperSlicesQuery; -import me.prettyprint.hector.api.HConsistencyLevel; -import me.prettyprint.hector.api.Serializer; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericArray; -import org.apache.gora.cassandra.query.CassandraQuery; -import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer; -import org.apache.gora.mapreduce.GoraRecordReader; -import org.apache.gora.persistency.impl.PersistentBase; -import org.apache.gora.query.Query; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * CassandraClient is where all of the primary datastore functionality is - * executed. Typically CassandraClient is invoked by calling - * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}. - * CassandraClient deals with Cassandra data model definition, mutation, - * and general/specific mappings. - * {@link org.apache.gora.cassandra.store.CassandraStore#initialize} . - * - * @param <K> - * @param <T> - */ -public class CassandraClient<K, T extends PersistentBase> { - - /** The logging implementation */ - public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class); - - private Cluster cluster; - private Keyspace keyspace; - private Mutator<K> mutator; - private Class<K> keyClass; - private Class<T> persistentClass; - - /** Object which holds the XML mapping for Cassandra. */ - private CassandraMapping cassandraMapping = null; - - /** Hector client default column family consistency level. */ - public static final String DEFAULT_HECTOR_CONSIS_LEVEL = "QUORUM"; - - /** Cassandra serializer to be used for serializing Gora's keys. */ - private Serializer<K> keySerializer; - - /** - * Method to maintain backward compatibility with earlier versions. - * @param keyClass - * @param persistentClass - * @throws Exception - */ - public void initialize(Class<K> keyClass, Class<T> persistentClass) - throws Exception { - initialize(keyClass, persistentClass, null); - } - - /** - * Given our key, persistentClass from - * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)} - * we make best efforts to dictate our data model. - * We make a quick check within {@link org.apache.gora.cassandra.store.CassandraClient#checkKeyspace() } - * to see if our keyspace has already been invented, this simple check prevents us from - * recreating the keyspace if it already exists. - * We then simple specify (based on the input keyclass) an appropriate serializer - * via {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer} before - * defining a mutator from and by which we can mutate this object. - * - * @param keyClass the Key by which we wish o assign a record object - * @param persistentClass the generated {@link org.apache.gora.persistency.Persistent} bean representing the data. - * @param properties key value pairs from gora.properties - * @throws Exception - */ - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws Exception { - this.keyClass = keyClass; - - // get cassandra mapping with persistent class - this.persistentClass = persistentClass; - this.cassandraMapping = CassandraMappingManager.getManager().get(persistentClass); - Map<String, String> accessMap = null; - if (properties != null) { - String username = properties - .getProperty("gora.cassandrastore.username"); - if (username != null) { - accessMap = new HashMap<>(); - accessMap.put("username", username); - String password = properties - .getProperty("gora.cassandrastore.password"); - if (password != null) { - accessMap.put("password", password); - } - } - } - - this.cluster = HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), - new CassandraHostConfigurator(this.cassandraMapping.getHostName()), accessMap); - - // add keyspace to cluster - checkKeyspace(); - - // Just create a Keyspace object on the client side, corresponding to an already - // existing keyspace with already created column families. - this.keyspace = HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster); - - this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass); - if (this.keySerializer == null) - LOG.error("Serializer for " + keyClass + " not found."); - this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer); - } - - /** - * Check if keyspace already exists. - * - * @return if keyspace already exists return true. - */ - public boolean keyspaceExists() { - KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName()); - return (keyspaceDefinition != null); - } - - /** - * Check if keyspace already exists. If not, create it. - * In this method, we also utilize Hector's - * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} logic. - * It is set by passing a - * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} object right - * when the {@link me.prettyprint.hector.api.Keyspace} is created. - * If we cannot find a consistency level within <code>gora.properites</code>, - * then column family consistency level is set to QUORUM (by default) which permits - * consistency to wait for a quorum of replicas to respond regardless of data center. - * QUORUM is Hector Client's default setting and we respect that here as well. - * - * @see <a href="http://hector-client.github.io/hector/build/html/content/consistency_level.html">Consistency Level</a> - */ - public void checkKeyspace() { - // "describe keyspace <keyspaceName>;" query - KeyspaceDefinition keyspaceDefinition = this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName()); - if (keyspaceDefinition == null) { - List<ColumnFamilyDefinition> columnFamilyDefinitions = this.cassandraMapping.getColumnFamilyDefinitions(); - - // GORA-197 - for (ColumnFamilyDefinition cfDef : columnFamilyDefinitions) { - cfDef.setComparatorType(ComparatorType.BYTESTYPE); - } - - keyspaceDefinition = HFactory.createKeyspaceDefinition( - this.cassandraMapping.getKeyspaceName(), - this.cassandraMapping.getKeyspaceReplicationStrategy(), - this.cassandraMapping.getKeyspaceReplicationFactor(), - columnFamilyDefinitions - ); - - this.cluster.addKeyspace(keyspaceDefinition, true); - - // GORA-167 Create a customized Consistency Level - ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel(); - Map<String, HConsistencyLevel> clmap = getConsisLevelForColFams(columnFamilyDefinitions); - // Column family consistency levels - ccl.setReadCfConsistencyLevels(clmap); - ccl.setWriteCfConsistencyLevels(clmap); - // Operations consistency levels - String opConsisLvl = (readOpConsLvl!=null && !readOpConsLvl.isEmpty())?readOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL; - ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl)); - LOG.debug("Hector read consistency configured to '" + opConsisLvl + "'."); - opConsisLvl = (writeOpConsLvl!=null && !writeOpConsLvl.isEmpty())?writeOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL; - ccl.setDefaultWriteConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl)); - LOG.debug("Hector write consistency configured to '" + opConsisLvl + "'."); - - HFactory.createKeyspace("Keyspace", this.cluster, ccl); - keyspaceDefinition = null; - } - else { - List<ColumnFamilyDefinition> cfDefs = keyspaceDefinition.getCfDefs(); - if (cfDefs == null || cfDefs.size() == 0) { - LOG.warn(keyspaceDefinition.getName() + " does not have any column family."); - } - else { - for (ColumnFamilyDefinition cfDef : cfDefs) { - ComparatorType comparatorType = cfDef.getComparatorType(); - if (! comparatorType.equals(ComparatorType.BYTESTYPE)) { - // GORA-197 - LOG.warn("The comparator type of " + cfDef.getName() + " column family is " + comparatorType.getTypeName() - + ", not BytesType. It may cause a fatal error on column validation later."); - } - else { - LOG.debug("The comparator type of " + cfDef.getName() + " column family is " - + comparatorType.getTypeName() + "."); - } - } - } - } - } - - /** - * Method in charge of setting the consistency level for defined column families. - * @param pColFams Column families - * @return Map<String, HConsistencyLevel> with the mapping between colFams and consistency level. - */ - private Map<String, HConsistencyLevel> getConsisLevelForColFams(List<ColumnFamilyDefinition> pColFams) { - Map<String, HConsistencyLevel> clMap = new HashMap<>(); - // Get columnFamily consistency level. - String colFamConsisLvl = (colFamConsLvl != null && !colFamConsLvl.isEmpty())?colFamConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL; - LOG.debug("ColumnFamily consistency level configured to '" + colFamConsisLvl + "'."); - // Define consistency for ColumnFamily "ColumnFamily" - for (ColumnFamilyDefinition colFamDef : pColFams) - clMap.put(colFamDef.getName(), HConsistencyLevel.valueOf(colFamConsisLvl)); - return clMap; - } - - /** - * Drop keyspace. - */ - public void dropKeyspace() { - this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName()); - } - - /** - * Insert a field in a column. - * @param key the row key - * @param fieldName the field name - * @param value the field value. - */ - public void addColumn(K key, String fieldName, Object value) { - if (value == null) { - LOG.debug( "field:"+fieldName+", its value is null."); - return; - } - - ByteBuffer byteBuffer = toByteBuffer(value); - String columnFamily = this.cassandraMapping.getFamily(fieldName); - String columnName = this.cassandraMapping.getColumn(fieldName); - - if (columnName == null) { - LOG.warn("Column name is null for field: " + fieldName ); - return; - } - - if( LOG.isDebugEnabled() ) LOG.debug( "fieldName: "+fieldName +" columnName: " + columnName ); - - String ttlAttr = this.cassandraMapping.getColumnsAttribs().get(columnName); - - if ( null == ttlAttr ){ - ttlAttr = CassandraMapping.DEFAULT_COLUMNS_TTL; - if( LOG.isDebugEnabled() ) LOG.debug( "ttl was not set for field: " + fieldName + ". Using " + ttlAttr ); - } else { - if( LOG.isDebugEnabled() ) LOG.debug( "ttl for field: " + fieldName + " is " + ttlAttr ); - } - - synchronized(mutator) { - HectorUtils.insertColumn(mutator, key, columnFamily, columnName, byteBuffer, ttlAttr); - } - } - - /** - * Delete a row within the keyspace. - * - * @param key - * @param familyName - * @param columnName - */ - public void deleteColumn(K key, String familyName, ByteBuffer columnName) { - synchronized(mutator) { - HectorUtils.deleteColumn(mutator, key, familyName, columnName); - } - } - - /** - * Deletes an entry based on its key. - * @param key - */ - public void deleteByKey(K key) { - Map<String, String> familyMap = this.cassandraMapping.getFamilyMap(); - deleteColumn(key, familyMap.values().iterator().next(), null); - } - - /** - * Insert a member in a super column. This is used for map and record Avro types. - * @param key the row key - * @param fieldName the field name - * @param columnName the column name (the member name, or the index of array) - * @param value the member value - */ - public void addSubColumn(K key, String fieldName, ByteBuffer columnName, Object value) { - if (value == null) { - return; - } - - ByteBuffer byteBuffer = toByteBuffer(value); - - String columnFamily = this.cassandraMapping.getFamily(fieldName); - String superColumnName = this.cassandraMapping.getColumn(fieldName); - String ttlAttr = this.cassandraMapping.getColumnsAttribs().get(superColumnName); - if ( null == ttlAttr ) { - ttlAttr = CassandraMapping.DEFAULT_COLUMNS_TTL; - if( LOG.isDebugEnabled() ) LOG.debug( "ttl was not set for field:" + fieldName + " .Using " + ttlAttr ); - } else { - if( LOG.isDebugEnabled() ) LOG.debug( "ttl for field:" + fieldName + " is " + ttlAttr ); - } - - synchronized(mutator) { - HectorUtils.insertSubColumn(mutator, key, columnFamily, superColumnName, columnName, byteBuffer, ttlAttr); - } - } - - /** - * Adds an subColumn inside the cassandraMapping file when a String is serialized - * @param key the row key - * @param fieldName the field name - * @param columnName the column name (the member name, or the index of array) - * @param value the member value - */ - public void addSubColumn(K key, String fieldName, String columnName, Object value) { - addSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName), value); - } - - /** - * Adds an subColumn inside the cassandraMapping file when an Integer is serialized - * @param key the row key - * @param fieldName the field name - * @param columnName the column name (the member name, or the index of array) - * @param value the member value - */ - public void addSubColumn(K key, String fieldName, Integer columnName, Object value) { - addSubColumn(key, fieldName, IntegerSerializer.get().toByteBuffer(columnName), value); - } - - - /** - * Delete a member in a super column. This is used for map and record Avro types. - * @param key the row key - * @param fieldName the field name - * @param columnName the column name (the member name, or the index of array) - */ - public void deleteSubColumn(K key, String fieldName, ByteBuffer columnName) { - - String columnFamily = this.cassandraMapping.getFamily(fieldName); - String superColumnName = this.cassandraMapping.getColumn(fieldName); - - synchronized(mutator) { - HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, columnName); - } - } - - /** - * Deletes a subColumn - * @param key - * @param fieldName - * @param columnName - */ - public void deleteSubColumn(K key, String fieldName, String columnName) { - deleteSubColumn(key, fieldName, StringSerializer.get().toByteBuffer(columnName)); - } - - /** - * Deletes all subcolumns from a super column. - * @param key the row key. - * @param fieldName the field name. - */ - public void deleteSubColumn(K key, String fieldName) { - String columnFamily = this.cassandraMapping.getFamily(fieldName); - String superColumnName = this.cassandraMapping.getColumn(fieldName); - synchronized(mutator) { - HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, null); - } - } - - public void deleteGenericArray(K key, String fieldName) { - //TODO Verify this. Everything that goes inside a genericArray will go inside a column so let's just delete that. - deleteColumn(key, cassandraMapping.getFamily(fieldName), toByteBuffer(fieldName)); - } - - public void addGenericArray(K key, String fieldName, GenericArray<?> array) { - if (isSuper( cassandraMapping.getFamily(fieldName) )) { - int i= 0; - for (Object itemValue: array) { - - // TODO: hack, do not store empty arrays - if (itemValue instanceof GenericArray<?>) { - if (((List<?>)itemValue).size() == 0) { - continue; - } - } else if (itemValue instanceof Map<?,?>) { - if (((Map<?, ?>)itemValue).size() == 0) { - continue; - } - } - - addSubColumn(key, fieldName, i++, itemValue); - } - } - else { - addColumn(key, fieldName, array); - } - } - - public void deleteStatefulHashMap(K key, String fieldName) { - if (isSuper( cassandraMapping.getFamily(fieldName) )) { - deleteSubColumn(key, fieldName); - } else { - deleteColumn(key, cassandraMapping.getFamily(fieldName), toByteBuffer(fieldName)); - } - } - - public void addStatefulHashMap(K key, String fieldName, Map<CharSequence,Object> map) { - if (isSuper( cassandraMapping.getFamily(fieldName) )) { - // as we don't know what has changed inside the map or If it's an empty map, then delete its content. - deleteSubColumn(key, fieldName); - // update if there is anything to update. - if (!map.isEmpty()) { - // If it's not empty, then update its content. - for (CharSequence mapKey: map.keySet()) { - // TODO: hack, do not store empty arrays - Object mapValue = map.get(mapKey); - if (mapValue instanceof GenericArray<?>) { - if (((List<?>)mapValue).size() == 0) { - continue; - } - } else if (mapValue instanceof Map<?,?>) { - if (((Map<?, ?>)mapValue).size() == 0) { - continue; - } - } - addSubColumn(key, fieldName, mapKey.toString(), mapValue); - } - } - } - else { - addColumn(key, fieldName, map); - } - } - - /** - * Serialize value to ByteBuffer using - * {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer#getSerializer(Object)}. - * @param value the member value {@link java.lang.Object}. - * @return ByteBuffer object - */ - public ByteBuffer toByteBuffer(Object value) { - ByteBuffer byteBuffer = null; - Serializer<Object> serializer = GoraSerializerTypeInferer.getSerializer(value); - if (serializer == null) { - LOG.warn("Serializer not found for: " + value.toString()); - } - else { - LOG.debug(serializer.getClass() + " selected as appropriate Serializer."); - byteBuffer = serializer.toByteBuffer(value); - } - if (byteBuffer == null) { - LOG.warn("Serialization value for: " + value.getClass().getName() + " = null"); - } - return byteBuffer; - } - - /** - * Select a family column in the keyspace. - * @param cassandraQuery a wrapper of the query - * @param family the family name to be queried - * @return a list of family rows - */ - public List<Row<K, ByteBuffer, ByteBuffer>> execute(CassandraQuery<K, T> cassandraQuery, String family) { - - String[] columnNames = cassandraQuery.getColumns(family); - ByteBuffer[] columnNameByteBuffers = new ByteBuffer[columnNames.length]; - for (int i = 0; i < columnNames.length; i++) { - columnNameByteBuffers[i] = StringSerializer.get().toByteBuffer(columnNames[i]); - } - Query<K, T> query = cassandraQuery.getQuery(); - int limit = (int) query.getLimit(); - if (limit < 1) { - limit = Integer.MAX_VALUE; - } - K startKey = query.getStartKey(); - K endKey = query.getEndKey(); - - RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = HFactory.createRangeSlicesQuery - (this.keyspace, this.keySerializer, ByteBufferSerializer.get(), ByteBufferSerializer.get()); - rangeSlicesQuery.setColumnFamily(family); - rangeSlicesQuery.setKeys(startKey, endKey); - rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE); - rangeSlicesQuery.setRowCount(limit); - rangeSlicesQuery.setColumnNames(columnNameByteBuffers); - - QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> queryResult = rangeSlicesQuery.execute(); - OrderedRows<K, ByteBuffer, ByteBuffer> orderedRows = queryResult.get(); - - return orderedRows.getList(); - } - - private String getMappingFamily(String pField){ - String family = null; - // checking if it was a UNION field the one we are retrieving - if (pField.indexOf(CassandraStore.UNION_COL_SUFIX) > 0) - family = this.cassandraMapping.getFamily(pField.substring(0,pField.indexOf(CassandraStore.UNION_COL_SUFIX))); - else - family = this.cassandraMapping.getFamily(pField); - return family; - } - - private String getMappingColumn(String pField){ - String column = null; - if (pField.indexOf(CassandraStore.UNION_COL_SUFIX) > 0) - column = pField; - else - column = this.cassandraMapping.getColumn(pField); - return column; - } - - /** - * Select the families that contain at least one column mapped to a query field. - * @param query indicates the columns to select - * @return a map which keys are the family names and values the - * corresponding column names required to get all the query fields. - */ - public Map<String, List<String>> getFamilyMap(Query<K, T> query) { - Map<String, List<String>> map = new HashMap<>(); - Schema persistentSchema = query.getDataStore().newPersistent().getSchema(); - for (String field: query.getFields()) { - String family = this.getMappingFamily(field); - String column = this.getMappingColumn(field); - - // check if the family value was already initialized - List<String> list = map.get(family); - if (list == null) { - list = new ArrayList<>(); - map.put(family, list); - } - if (persistentSchema.getField(field).schema().getType() == Type.UNION) - list.add(field + CassandraStore.UNION_COL_SUFIX); - if (column != null) { - list.add(column); - } - } - - return map; - } - - /** - * Retrieves the cassandraMapping which holds whatever was mapped - * from the gora-cassandra-mapping.xml - * @return - */ - public CassandraMapping getCassandraMapping(){ - return this.cassandraMapping; - } - - /** - * Select the field names according to the column names, which format - * if fully qualified: "family:column" - * @param query - * @return a map which keys are the fully qualified column - * names and values the query fields - */ - public Map<String, String> getReverseMap(Query<K, T> query) { - Map<String, String> map = new HashMap<>(); - Schema persistentSchema = query.getDataStore().newPersistent().getSchema(); - for (String field: query.getFields()) { - String family = this.getMappingFamily(field); - String column = this.getMappingColumn(field); - if (persistentSchema.getField(field).schema().getType() == Type.UNION) - map.put(family + ":" + field + CassandraStore.UNION_COL_SUFIX, field + CassandraStore.UNION_COL_SUFIX); - map.put(family + ":" + column, field); - } - - return map; - } - - /** - * Determines if a column is a superColumn or not. - * @param family - * @return boolean - */ - public boolean isSuper(String family) { - return this.cassandraMapping.isSuper(family); - } - - public List<SuperRow<K, String, ByteBuffer, ByteBuffer>> executeSuper(CassandraQuery<K, T> cassandraQuery, String family) { - String[] columnNames = cassandraQuery.getColumns(family); - Query<K, T> query = cassandraQuery.getQuery(); - int limit = (int) query.getLimit(); - if (limit < 1) { - limit = Integer.MAX_VALUE; - } - K startKey = query.getStartKey(); - K endKey = query.getEndKey(); - - RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery - (this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get()); - rangeSuperSlicesQuery.setColumnFamily(family); - rangeSuperSlicesQuery.setKeys(startKey, endKey); - rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE); - rangeSuperSlicesQuery.setRowCount(limit); - rangeSuperSlicesQuery.setColumnNames(columnNames); - - - QueryResult<OrderedSuperRows<K, String, ByteBuffer, ByteBuffer>> queryResult = rangeSuperSlicesQuery.execute(); - OrderedSuperRows<K, String, ByteBuffer, ByteBuffer> orderedRows = queryResult.get(); - return orderedRows.getList(); - - - } - - /** - * Obtain Schema/Keyspace name - * @return Keyspace - */ - public String getKeyspaceName() { - return this.cassandraMapping.getKeyspaceName(); - } -} http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java deleted file mode 100644 index b7a7087..0000000 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java +++ /dev/null @@ -1,329 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gora.cassandra.store; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition; -import me.prettyprint.cassandra.service.ThriftCfDef; -import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition; -import me.prettyprint.hector.api.ddl.ColumnType; -import me.prettyprint.hector.api.ddl.ComparatorType; - -import org.jdom.Element; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Mapping definitions for CouchDB. - */ -public class CassandraMapping { - - public static final Logger LOG = LoggerFactory.getLogger(CassandraMapping.class); - - private static final String NAME_ATTRIBUTE = "name"; - private static final String COLUMN_ATTRIBUTE = "qualifier"; - private static final String FAMILY_ATTRIBUTE = "family"; - private static final String SUPER_ATTRIBUTE = "type"; - private static final String CLUSTER_ATTRIBUTE = "cluster"; - private static final String HOST_ATTRIBUTE = "host"; - - private static final String GCGRACE_SECONDS_ATTRIBUTE = "gc_grace_seconds"; - private static final String COLUMNS_TTL_ATTRIBUTE = "ttl"; - private static final String REPLICATION_FACTOR_ATTRIBUTE = "replication_factor"; - private static final String REPLICATION_STRATEGY_ATTRIBUTE = "placement_strategy"; - - public static final String DEFAULT_REPLICATION_FACTOR = "1"; - public static final String DEFAULT_REPLICATION_STRATEGY = "org.apache.cassandra.locator.SimpleStrategy"; - public static final String DEFAULT_COLUMNS_TTL = "0"; - public static final String DEFAULT_GCGRACE_SECONDS = "0"; - - private String hostName; - private String clusterName; - private String keyspaceName; - private String keyspaceStrategy; - private int keyspaceRF; - - - /** - * List of the super column families. - */ - private List<String> superFamilies = new ArrayList<>(); - - /** - * Look up the column family associated to the Avro field. - */ - private Map<String, String> familyMap = new HashMap<>(); - - /** - * Look up the column associated to the Avro field. - */ - private Map<String, String> columnMap = new HashMap<>(); - - /** - * Helps storing attributes defined for each field. - */ - private Map<String, String> columnAttrMap = new HashMap<>(); - - /** - * Look up the column family from its name. - */ - private Map<String, BasicColumnFamilyDefinition> columnFamilyDefinitions = - new HashMap<>(); - - - /** - * Simply gets the Cassandra host name. - * @return hostName - */ - public String getHostName() { - return this.hostName; - } - - /** - * Simply gets the Cassandra cluster (the machines (nodes) - * in a logical Cassandra instance) name. - * Clusters can contain multiple keyspaces. - * @return clusterName - */ - public String getClusterName() { - return this.clusterName; - } - - /** - * Simply gets the Cassandra namespace for ColumnFamilies, typically one per application - * @return - */ - public String getKeyspaceName() { - return this.keyspaceName; - } - - /** - * gets the replication strategy - * @return string class name to be used for strategy - */ - public String getKeyspaceReplicationStrategy() { - return this.keyspaceStrategy; - } - - /** - * gets the replication factor - * @return int replication factor - */ - public int getKeyspaceReplicationFactor() { - return this.keyspaceRF; - } - - /** - * Primary class for loading Cassandra configuration from the 'MAPPING_FILE'. - * It should be noted that should the "qualifier" attribute and its associated - * value be absent from class field definition, it will automatically be set to - * the field name value. - * - */ - @SuppressWarnings("unchecked") - public CassandraMapping(Element keyspace, Element mapping) { - if (keyspace == null) { - LOG.error("Keyspace element should not be null!"); - return; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Located Cassandra Keyspace"); - } - } - this.keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE); - if (this.keyspaceName == null) { - LOG.error("Error locating Cassandra Keyspace name attribute!"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Located Cassandra Keyspace name: '" + keyspaceName + "'"); - } - } - this.clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE); - if (this.clusterName == null) { - LOG.error("Error locating Cassandra Keyspace cluster attribute!"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Located Cassandra Keyspace cluster: '" + clusterName + "'"); - } - } - this.hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE); - if (this.hostName == null) { - LOG.error("Error locating Cassandra Keyspace host attribute!"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Located Cassandra Keyspace host: '" + hostName + "'"); - } - } - - // setting replication strategy - this.keyspaceStrategy = keyspace.getAttributeValue( REPLICATION_STRATEGY_ATTRIBUTE ); - if( null == this.keyspaceStrategy ) { - this.keyspaceStrategy = DEFAULT_REPLICATION_STRATEGY; - } - if( LOG.isDebugEnabled() ) { - LOG.debug( "setting Keyspace replication strategy to " + this.keyspaceStrategy ); - } - - // setting replication factor - String tmp = keyspace.getAttributeValue( REPLICATION_FACTOR_ATTRIBUTE ); - if( null == tmp ) { - tmp = DEFAULT_REPLICATION_FACTOR; - } - this.keyspaceRF = Integer.parseInt( tmp ); - if( LOG.isDebugEnabled() ) { - LOG.debug( "setting Keyspace replication factor to " + this.keyspaceRF ); - } - - - // load column family definitions - List<Element> elements = keyspace.getChildren(); - for (Element element: elements) { - BasicColumnFamilyDefinition cfDef = new BasicColumnFamilyDefinition(); - - String familyName = element.getAttributeValue(NAME_ATTRIBUTE); - if (familyName == null) { - LOG.error("Error locating column family name attribute!"); - continue; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Located column family: '" + familyName + "'" ); - } - } - String gcgrace_scs = element.getAttributeValue(GCGRACE_SECONDS_ATTRIBUTE); - if (gcgrace_scs == null) { - LOG.warn("Error locating gc_grace_seconds attribute for '" + familyName + "' column family"); - LOG.warn("Using gc_grace_seconds default value which is: " + DEFAULT_GCGRACE_SECONDS - + " and is viable ONLY FOR A SINGLE NODE CLUSTER"); - LOG.warn("please update the gora-cassandra-mapping.xml file to avoid seeing this warning"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Located gc_grace_seconds: '" + gcgrace_scs + "'" ); - } - } - - String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE); - if (superAttribute != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Located super column family"); - } - this.superFamilies.add(familyName); - if (LOG.isDebugEnabled()) { - LOG.debug("Added super column family: '" + familyName + "'"); - } - cfDef.setColumnType(ColumnType.SUPER); - cfDef.setSubComparatorType(ComparatorType.BYTESTYPE); - } - - cfDef.setKeyspaceName(this.keyspaceName); - cfDef.setName(familyName); - cfDef.setComparatorType(ComparatorType.BYTESTYPE); - cfDef.setDefaultValidationClass(ComparatorType.BYTESTYPE.getClassName()); - - cfDef.setGcGraceSeconds(Integer.parseInt( gcgrace_scs!=null?gcgrace_scs:DEFAULT_GCGRACE_SECONDS)); - this.columnFamilyDefinitions.put(familyName, cfDef); - - } - - // load column definitions - elements = mapping.getChildren(); - for (Element element: elements) { - String fieldName = element.getAttributeValue(NAME_ATTRIBUTE); - String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE); - String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE); - String ttlValue = element.getAttributeValue(COLUMNS_TTL_ATTRIBUTE); - if (fieldName == null) { - LOG.error("Field name is not declared."); - continue; - } - if (familyName == null) { - LOG.error("Family name is not declared for \"" + fieldName + "\" field."); - continue; - } - if (columnName == null) { - LOG.warn("Column name (qualifier) is not declared for \"" + fieldName + "\" field."); - columnName = fieldName; - } - if (ttlValue == null) { - LOG.warn("TTL value is not defined for \"" + fieldName + "\" field. \n Using default value: " + DEFAULT_COLUMNS_TTL); - } - - BasicColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(familyName); - if (columnFamilyDefinition == null) { - LOG.warn("Family " + familyName + " was not declared in the keyspace."); - } - - this.familyMap.put(fieldName, familyName); - this.columnMap.put(fieldName, columnName); - // TODO we should find a way of storing more values into this map - this.columnAttrMap.put(columnName, ttlValue!=null?ttlValue:DEFAULT_COLUMNS_TTL); - } - } - - /** - * Add new column to the CassandraMapping using the the below parameters - * @param pFamilyName the column family name - * @param pFieldName the Avro field from the Schema - * @param pColumnName the column name within the column family. - */ - public void addColumn(String pFamilyName, String pFieldName, String pColumnName){ - this.familyMap.put(pFieldName, pFamilyName); - this.columnMap.put(pFieldName, pColumnName); - } - - public String getFamily(String name) { - return this.familyMap.get(name); - } - - public String getColumn(String name) { - return this.columnMap.get(name); - } - - public Map<String,String> getFamilyMap(){ - return this.familyMap; - } - - public Map<String, String> getColumnsAttribs(){ - return this.columnAttrMap; - } - - /** - * Read family super attribute. - * @param family the family name - * @return true is the family is a super column family - */ - public boolean isSuper(String family) { - return this.superFamilies.indexOf(family) != -1; - } - - public List<ColumnFamilyDefinition> getColumnFamilyDefinitions() { - List<ColumnFamilyDefinition> list = new ArrayList<>(); - for (String key: this.columnFamilyDefinitions.keySet()) { - ColumnFamilyDefinition columnFamilyDefinition = this.columnFamilyDefinitions.get(key); - ThriftCfDef thriftCfDef = new ThriftCfDef(columnFamilyDefinition); - list.add(thriftCfDef); - } - - return list; - } - -} http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java deleted file mode 100644 index 6b46eec..0000000 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gora.cassandra.store; - -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.jdom.Document; -import org.jdom.Element; -import org.jdom.JDOMException; -import org.jdom.input.SAXBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A builder for creating the mapper. - */ -public class CassandraMappingManager { - - public static final Logger LOG = LoggerFactory.getLogger(CassandraMappingManager.class); - - private static final String MAPPING_FILE = "gora-cassandra-mapping.xml"; - private static final String KEYSPACE_ELEMENT = "keyspace"; - private static final String NAME_ATTRIBUTE = "name"; - private static final String MAPPING_ELEMENT = "class"; - private static final String KEYCLASS_ATTRIBUTE = "keyClass"; - private static final String HOST_ATTRIBUTE = "host"; - private static final String CLUSTER_ATTRIBUTE = "cluster"; - // singleton - private static CassandraMappingManager manager = new CassandraMappingManager(); - - public static CassandraMappingManager getManager() { - return manager; - } - - /** - * Objects to maintain mapped keyspaces - */ - private Map<String, Element> keyspaceMap = null; - private Map<String, Element> mappingMap = null; - - private CassandraMappingManager() { - keyspaceMap = new HashMap<>(); - mappingMap = new HashMap<>(); - try { - loadConfiguration(); - } - catch (JDOMException | IOException e) { - LOG.error(e.toString()); - } - } - - public CassandraMapping get(Class<?> persistentClass) { - String className = persistentClass.getName(); - Element mappingElement = mappingMap.get(className); - if (mappingElement == null) { - LOG.error("Mapping element does not exist for className=" + className); - return null; - } - String keyspaceName = mappingElement.getAttributeValue(KEYSPACE_ELEMENT); - if (LOG.isDebugEnabled()) { - LOG.debug("persistentClassName=" + className + " -> keyspaceName=" + keyspaceName); - } - Element keyspaceElement = keyspaceMap.get(keyspaceName); - if (keyspaceElement == null) { - LOG.error("Keyspace element does not exist for keyspaceName=" + keyspaceName); - return null; - } - return new CassandraMapping(keyspaceElement, mappingElement); - } - - /** - * Primary class for loading Cassandra configuration from the 'MAPPING_FILE'. - * - * @throws JDOMException - * @throws IOException - */ - @SuppressWarnings("unchecked") - public void loadConfiguration() throws JDOMException, IOException { - SAXBuilder saxBuilder = new SAXBuilder(); - // get mapping file - InputStream inputStream = getClass().getClassLoader().getResourceAsStream(MAPPING_FILE); - if (inputStream == null){ - LOG.warn("Mapping file '" + MAPPING_FILE + "' could not be found!"); - throw new IOException("Mapping file '" + MAPPING_FILE + "' could not be found!"); - } - Document document = saxBuilder.build(inputStream); - if (document == null) { - LOG.warn("Mapping file '" + MAPPING_FILE + "' could not be found!"); - throw new IOException("Mapping file '" + MAPPING_FILE + "' could not be found!"); - } - Element root = document.getRootElement(); - // find cassandra keyspace element - List<Element> keyspaces = root.getChildren(KEYSPACE_ELEMENT); - if (keyspaces == null || keyspaces.size() == 0) { - LOG.error("Error locating Cassandra Keyspace element!"); - } - else { - for (Element keyspace : keyspaces) { - // log name, cluster and host for given keyspace(s) - String keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE); - String clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE); - String hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE); - if (LOG.isDebugEnabled()) { - LOG.debug("Located Cassandra Keyspace: '" + keyspaceName + "' in cluster '" + clusterName + - "' on host '" + hostName + "'."); - } - if (keyspaceName == null) { - LOG.error("Error locating Cassandra Keyspace name attribute!"); - continue; - } - keyspaceMap.put(keyspaceName, keyspace); - } - } - - // load column definitions - List<Element> mappings = root.getChildren(MAPPING_ELEMENT); - if (mappings == null || mappings.size() == 0) { - LOG.error("Error locating Cassandra Mapping class element!"); - } - else { - for (Element mapping : mappings) { - // associate persistent and class names for keyspace(s) - String className = mapping.getAttributeValue(NAME_ATTRIBUTE); - String keyClassName = mapping.getAttributeValue(KEYCLASS_ATTRIBUTE); - String keyspaceName = mapping.getAttributeValue(KEYSPACE_ELEMENT); - if (LOG.isDebugEnabled()) { - LOG.debug("Located Cassandra Mapping: keyClass: '" + keyClassName + "' in storage class '" - + className + "' for Keyspace '" + keyspaceName + "'."); - } - if (className == null) { - LOG.error("Error locating Cassandra Mapping class name attribute!"); - continue; - } - mappingMap.put(className, mapping); - } - } - } -} http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java deleted file mode 100644 index 5f21aca..0000000 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java +++ /dev/null @@ -1,668 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gora.cassandra.store; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.Collections; -import java.util.concurrent.ConcurrentHashMap; - -import me.prettyprint.hector.api.beans.ColumnSlice; -import me.prettyprint.hector.api.beans.HColumn; -import me.prettyprint.hector.api.beans.HSuperColumn; -import me.prettyprint.hector.api.beans.Row; -import me.prettyprint.hector.api.beans.SuperRow; -import me.prettyprint.hector.api.beans.SuperSlice; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericArray; -import org.apache.avro.generic.GenericData.Array; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.specific.SpecificData; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.gora.cassandra.query.CassandraQuery; -import org.apache.gora.cassandra.query.CassandraResult; -import org.apache.gora.cassandra.query.CassandraResultSet; -import org.apache.gora.cassandra.query.CassandraRow; -import org.apache.gora.cassandra.query.CassandraSubColumn; -import org.apache.gora.cassandra.query.CassandraSuperColumn; -import org.apache.gora.persistency.Persistent; -import org.apache.gora.persistency.impl.DirtyListWrapper; -import org.apache.gora.persistency.impl.PersistentBase; -import org.apache.gora.query.PartitionQuery; -import org.apache.gora.query.Query; -import org.apache.gora.query.Result; -import org.apache.gora.query.impl.PartitionQueryImpl; -import org.apache.gora.store.DataStoreFactory; -import org.apache.gora.store.impl.DataStoreBase; -import org.apache.gora.cassandra.serializers.AvroSerializerUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link org.apache.gora.cassandra.store.CassandraStore} is the primary class - * responsible for directing Gora CRUD operations into Cassandra. We (delegate) rely - * heavily on {@link org.apache.gora.cassandra.store.CassandraClient} for many operations - * such as initialization, creating and deleting schemas (Cassandra Keyspaces), etc. - */ -public class CassandraStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { - - /** Logging implementation */ - public static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class); - - /** Consistency property level for Cassandra column families */ - private static final String COL_FAM_CL = "cf.consistency.level"; - - /** Consistency property level for Cassandra read operations. */ - private static final String READ_OP_CL = "read.consistency.level"; - - /** Consistency property level for Cassandra write operations. */ - private static final String WRITE_OP_CL = "write.consistency.level"; - - /** Variables to hold different consistency levels defined by the properties. */ - public static String colFamConsLvl; - public static String readOpConsLvl; - public static String writeOpConsLvl; - - private CassandraClient<K, T> cassandraClient = new CassandraClient<>(); - - /** - * Fixed string with value "UnionIndex" used to generate an extra column based on - * the original field's name - */ - public static final String UNION_COL_SUFIX = "_UnionIndex"; - - /** - * Default schema index with value "0" used when AVRO Union data types are stored - */ - public static final int DEFAULT_UNION_SCHEMA = 0; - - /** - * The values are Avro fields pending to be stored. - * - * We want to iterate over the keys in insertion order. - * We don't want to lock the entire collection before iterating over the keys, - * since in the meantime other threads are adding entries to the map. - */ - private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K, T>()); - - public static final ThreadLocal<BinaryEncoder> encoders = - new ThreadLocal<>(); - - /** - * Create a {@link java.util.concurrent.ConcurrentHashMap} for the - * datum readers and writers. - * This is necessary because they are not thread safe, at least not before - * Avro 1.4.0 (See AVRO-650). - * When they are thread safe, it is possible to maintain a single reader and - * writer pair for every schema, instead of one for every thread. - * @see <a href="https://issues.apache.org/jira/browse/AVRO-650">AVRO-650</a> - */ - public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> writerMap = - new ConcurrentHashMap<>(); - - /** The default constructor for CassandraStore */ - public CassandraStore() throws Exception { - } - - /** - * Initialize is called when then the call to - * {@link org.apache.gora.store.DataStoreFactory#createDataStore} - * is made. In this case, we merely delegate the store initialization to the - * {@link org.apache.gora.cassandra.store.CassandraClient#initialize}. - * - * @param keyClass - * @param persistent - * @param properties - */ - public void initialize(Class<K> keyClass, Class<T> persistent, Properties properties) { - try { - super.initialize(keyClass, persistent, properties); - if (autoCreateSchema) { - // If this is not set, then each Cassandra client should set its default - // column family - colFamConsLvl = DataStoreFactory.findProperty(properties, this, COL_FAM_CL, null); - // operations - readOpConsLvl = DataStoreFactory.findProperty(properties, this, READ_OP_CL, null); - writeOpConsLvl = DataStoreFactory.findProperty(properties, this, WRITE_OP_CL, null); - } - this.cassandraClient.initialize(keyClass, persistent, properties); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - - @Override - public void close() { - LOG.debug("close"); - flush(); - } - - @Override - public void createSchema() { - LOG.debug("creating Cassandra keyspace"); - this.cassandraClient.checkKeyspace(); - } - - @Override - public boolean delete(K key) { - this.cassandraClient.deleteByKey(key); - return true; - } - - @Override - public long deleteByQuery(Query<K, T> query) { - LOG.debug("delete by query " + query); - return 0; - } - - @Override - public void deleteSchema() { - LOG.debug("delete schema"); - this.cassandraClient.dropKeyspace(); - } - - /** - * When executing Gora Queries in Cassandra we query the Cassandra keyspace by families. - * When we add sub/supercolumns, Gora keys are mapped to Cassandra partition keys only. - * This is because we follow the Cassandra logic where column family data is - * partitioned across nodes based on row Key. - */ - @Override - public Result<K, T> execute(Query<K, T> query) { - - Map<String, List<String>> familyMap = this.cassandraClient.getFamilyMap(query); - Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query); - - CassandraQuery<K, T> cassandraQuery = new CassandraQuery<>(); - cassandraQuery.setQuery(query); - cassandraQuery.setFamilyMap(familyMap); - - CassandraResult<K, T> cassandraResult = new CassandraResult<>(this, query); - cassandraResult.setReverseMap(reverseMap); - - CassandraResultSet<K> cassandraResultSet = new CassandraResultSet<>(); - - // We query Cassandra keyspace by families. - for (String family : familyMap.keySet()) { - if (family == null) { - continue; - } - if (this.cassandraClient.isSuper(family)) { - addSuperColumns(family, cassandraQuery, cassandraResultSet); - - } else { - addSubColumns(family, cassandraQuery, cassandraResultSet); - } - } - - cassandraResult.setResultSet(cassandraResultSet); - - return cassandraResult; - } - - /** - * When we add subcolumns, Gora keys are mapped to Cassandra partition keys only. - * This is because we follow the Cassandra logic where column family data is - * partitioned across nodes based on row Key. - */ - private void addSubColumns(String family, CassandraQuery<K, T> cassandraQuery, - CassandraResultSet<K> cassandraResultSet) { - // select family columns that are included in the query - List<Row<K, ByteBuffer, ByteBuffer>> rows = this.cassandraClient.execute(cassandraQuery, family); - - for (Row<K, ByteBuffer, ByteBuffer> row : rows) { - K key = row.getKey(); - - // find associated row in the resultset - CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key); - if (cassandraRow == null) { - cassandraRow = new CassandraRow<>(); - cassandraResultSet.putRow(key, cassandraRow); - cassandraRow.setKey(key); - } - - ColumnSlice<ByteBuffer, ByteBuffer> columnSlice = row.getColumnSlice(); - - for (HColumn<ByteBuffer, ByteBuffer> hColumn : columnSlice.getColumns()) { - CassandraSubColumn cassandraSubColumn = new CassandraSubColumn(); - cassandraSubColumn.setValue(hColumn); - cassandraSubColumn.setFamily(family); - cassandraRow.add(cassandraSubColumn); - } - - } - } - - /** - * When we add supercolumns, Gora keys are mapped to Cassandra partition keys only. - * This is because we follow the Cassandra logic where column family data is - * partitioned across nodes based on row Key. - */ - private void addSuperColumns(String family, CassandraQuery<K, T> cassandraQuery, - CassandraResultSet<K> cassandraResultSet) { - - List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows = this.cassandraClient.executeSuper(cassandraQuery, family); - for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) { - K key = superRow.getKey(); - CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key); - if (cassandraRow == null) { - cassandraRow = new CassandraRow<>(); - cassandraResultSet.putRow(key, cassandraRow); - cassandraRow.setKey(key); - } - - SuperSlice<String, ByteBuffer, ByteBuffer> superSlice = superRow.getSuperSlice(); - for (HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn: superSlice.getSuperColumns()) { - CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn(); - cassandraSuperColumn.setValue(hSuperColumn); - cassandraSuperColumn.setFamily(family); - cassandraRow.add(cassandraSuperColumn); - } - } - } - - /** - * Flush the buffer which is a synchronized {@link java.util.LinkedHashMap} - * storing fields pending to be stored by - * {@link org.apache.gora.cassandra.store.CassandraStore#put(Object, PersistentBase)} - * operations. Invoking this method therefore writes the buffered rows - * into Cassandra. - * @see org.apache.gora.store.DataStore#flush() - */ - @Override - public void flush() { - - Set<K> keys = this.buffer.keySet(); - - // this duplicates memory footprint - @SuppressWarnings("unchecked") - K[] keyArray = (K[]) keys.toArray(); - - // iterating over the key set directly would throw - //ConcurrentModificationException with java.util.HashMap and subclasses - for (K key: keyArray) { - T value = this.buffer.get(key); - if (value == null) { - LOG.info("Value to update is null for key: " + key); - continue; - } - Schema schema = value.getSchema(); - - for (Field field: schema.getFields()) { - if (value.isDirty(field.pos())) { - addOrUpdateField(key, field, field.schema(), value.get(field.pos())); - } - } - } - - // remove flushed rows from the buffer as all - // added or updated fields should now have been written. - for (K key: keyArray) { - this.buffer.remove(key); - } - } - - @Override - public T get(K key, String[] fields) { - CassandraQuery<K,T> query = new CassandraQuery<>(); - query.setDataStore(this); - query.setKeyRange(key, key); - - if (fields == null){ - fields = this.getFields(); - } - query.setFields(fields); - - query.setLimit(1); - Result<K,T> result = execute(query); - boolean hasResult = false; - try { - hasResult = result.next(); - } catch (Exception e) { - LOG.error(e.getMessage()); - throw new RuntimeException(e); - } - return hasResult ? result.get() : null; - } - - @Override - public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) - throws IOException { - // TODO GORA-298 Implement CassandraStore#getPartitions - List<PartitionQuery<K,T>> partitions = new ArrayList<>(); - PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query); - pqi.setConf(getConf()); - partitions.add(pqi); - return partitions; - } - - /** - * In Cassandra Schemas are referred to as Keyspaces - * @return Keyspace - */ - @Override - public String getSchemaName() { - return this.cassandraClient.getKeyspaceName(); - } - - @Override - public Query<K, T> newQuery() { - Query<K,T> query = new CassandraQuery<>(this); - query.setFields(getFieldsToQuery(null)); - return query; - } - - /** - * When doing the - * {@link org.apache.gora.cassandra.store.CassandraStore#put(Object, PersistentBase)} - * operation, the logic is as follows: - * <ol> - * <li>Obtain the Avro {@link org.apache.avro.Schema} for the object.</li> - * <li>Create a new duplicate instance of the object (explained in more detail below) **.</li> - * <li>Obtain a {@link java.util.List} of the {@link org.apache.avro.Schema} - * {@link org.apache.avro.Schema.Field}'s.</li> - * <li>Iterate through the field {@link java.util.List}. This allows us to - * consequently process each item.</li> - * <li>Check to see if the {@link org.apache.avro.Schema.Field} is NOT dirty. - * If this condition is true then we DO NOT process this field.</li> - * <li>Obtain the element at the specified position in this list so we can - * directly operate on it.</li> - * <li>Obtain the {@link org.apache.avro.Schema.Type} of the element obtained - * above and process it accordingly. N.B. For nested type ARRAY, MAP - * RECORD or UNION, we shadow the checks in bullet point 5 above to infer that the - * {@link org.apache.avro.Schema.Field} is either at - * position 0 OR it is NOT dirty. If one of these conditions is true then we DO NOT - * process this field. This is carried out in - * {@link org.apache.gora.cassandra.store.CassandraStore#getFieldValue(Schema, Type, Object)}</li> - * <li>We then insert the Key and Object into the {@link java.util.LinkedHashMap} buffer - * before being flushed. This performs a structural modification of the map.</li> - * </ol> - * ** We create a duplicate instance of the object to be persisted and insert processed - * objects into a synchronized {@link java.util.LinkedHashMap}. This allows - * us to keep all the objects in memory till flushing. - * - * @param key for the Avro Record (object). - * @param value Record object to be persisted in Cassandra - * @see org.apache.gora.store.DataStore#put(java.lang.Object,org.apache.gora.persistency.Persistent) - */ - @Override - public void put(K key, T value) { - Schema schema = value.getSchema(); - @SuppressWarnings("unchecked") - T p = (T) SpecificData.get().newRecord(value, schema); - List<Field> fields = schema.getFields(); - for (int i = 1; i < fields.size(); i++) { - if (!value.isDirty(i)) { - continue; - } - Field field = fields.get(i); - Type type = field.schema().getType(); - Object fieldValue = value.get(field.pos()); - Schema fieldSchema = field.schema(); - // check if field has a nested structure (array, map, record or union) - fieldValue = getFieldValue(fieldSchema, type, fieldValue); - p.put(field.pos(), fieldValue); - } - // this performs a structural modification of the map - this.buffer.put(key, p); - } - - /** - * For every field within an object, we pass in a field schema, Type and value. - * This enables us to process fields (based on their characteristics) - * preparing them for persistence. - * @param fieldSchema the associated field schema - * @param type the field type - * @param fieldValue the field value. - * @return - */ - private Object getFieldValue(Schema fieldSchema, Type type, Object fieldValue ){ - switch(type) { - case RECORD: - PersistentBase persistent = (PersistentBase) fieldValue; - PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema()); - for (Field member: fieldSchema.getFields()) { - if (member.pos() == 0 || !persistent.isDirty()) { - continue; - } - Schema memberSchema = member.schema(); - Type memberType = memberSchema.getType(); - Object memberValue = persistent.get(member.pos()); - newRecord.put(member.pos(), getFieldValue(memberSchema, memberType, memberValue)); - } - fieldValue = newRecord; - break; - case MAP: - Map<?, ?> map = (Map<?, ?>) fieldValue; - fieldValue = map; - break; - case ARRAY: - fieldValue = (List<?>) fieldValue; - break; - case UNION: - // storing the union selected schema, the actual value will - // be stored as soon as we get break out. - if (fieldValue != null){ - int schemaPos = getUnionSchema(fieldValue,fieldSchema); - Schema unionSchema = fieldSchema.getTypes().get(schemaPos); - Type unionType = unionSchema.getType(); - fieldValue = getFieldValue(unionSchema, unionType, fieldValue); - } - //p.put( schemaPos, p.getSchema().getField(field.name() + CassandraStore.UNION_COL_SUFIX)); - //p.put(fieldPos, fieldValue); - break; - default: - break; - } - return fieldValue; - } - - /** - * Add a field to Cassandra according to its type. - * @param key the key of the row where the field should be added - * @param field the Avro field representing a datum - * @param schema the schema belonging to the particular Avro field - * @param value the field value - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - private void addOrUpdateField(K key, Field field, Schema schema, Object value) { - Type type = schema.getType(); - // checking if the value to be updated is used for saving union schema - if (!field.name().contains(CassandraStore.UNION_COL_SUFIX)){ - switch (type) { - case STRING: - case BOOLEAN: - case INT: - case LONG: - case BYTES: - case FLOAT: - case DOUBLE: - case FIXED: - this.cassandraClient.addColumn(key, field.name(), value); - break; - case RECORD: - if (value != null) { - if (value instanceof PersistentBase) { - PersistentBase persistentBase = (PersistentBase) value; - try { - byte[] byteValue = AvroSerializerUtil.serializer(persistentBase, schema); - this.cassandraClient.addColumn(key, field.name(), byteValue); - } catch (IOException e) { - LOG.warn(field.name() + " named record could not be serialized."); - } - } else { - LOG.warn("Record with value: " + value.toString() + " not supported for field: " + field.name()); - } - } else { - LOG.warn("Setting content of: " + field.name() + " to null."); - String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name()); - this.cassandraClient.deleteColumn(key, familyName, this.cassandraClient.toByteBuffer(field.name())); - } - break; - case MAP: - if (value != null) { - if (value instanceof Map<?, ?>) { - Map<CharSequence,Object> map = (Map<CharSequence,Object>)value; - Schema valueSchema = schema.getValueType(); - Type valueType = valueSchema.getType(); - if (Type.UNION.equals(valueType)){ - Map<CharSequence,Object> valueMap = new HashMap<>(); - for (CharSequence mapKey: map.keySet()) { - Object mapValue = map.get(mapKey); - int valueUnionIndex = getUnionSchema(mapValue, valueSchema); - valueMap.put((mapKey+UNION_COL_SUFIX), valueUnionIndex); - valueMap.put(mapKey, mapValue); - } - map = valueMap; - } - - String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name()); - - // If map is not super column. We using Avro serializer. - if (!this.cassandraClient.isSuper( familyName )){ - try { - byte[] byteValue = AvroSerializerUtil.serializer(map, schema); - this.cassandraClient.addColumn(key, field.name(), byteValue); - } catch (IOException e) { - LOG.warn(field.name() + " named map could not be serialized."); - } - }else{ - this.cassandraClient.addStatefulHashMap(key, field.name(), map); - } - } else { - LOG.warn("Map with value: " + value.toString() + " not supported for field: " + field.name()); - } - } else { - // delete map - LOG.warn("Setting content of: " + field.name() + " to null."); - this.cassandraClient.deleteStatefulHashMap(key, field.name()); - } - break; - case ARRAY: - if (value != null) { - if (value instanceof DirtyListWrapper<?>) { - DirtyListWrapper fieldValue = (DirtyListWrapper<?>)value; - GenericArray valueArray = new Array(fieldValue.size(), schema); - for (int i = 0; i < fieldValue.size(); i++) { - valueArray.add(i, fieldValue.get(i)); - } - this.cassandraClient.addGenericArray(key, field.name(), (GenericArray<?>)valueArray); - } else { - LOG.warn("Array with value: " + value.toString() + " not supported for field: " + field.name()); - } - } else { - LOG.warn("Setting content of: " + field.name() + " to null."); - this.cassandraClient.deleteGenericArray(key, field.name()); - } - break; - case UNION: - // adding union schema index - String columnName = field.name() + UNION_COL_SUFIX; - String familyName = this.cassandraClient.getCassandraMapping().getFamily(field.name()); - if(value != null) { - int schemaPos = getUnionSchema(value, schema); - LOG.debug("Union with value: " + value.toString() + " at index: " + schemaPos + " supported for field: " + field.name()); - this.cassandraClient.getCassandraMapping().addColumn(familyName, columnName, columnName); - if (this.cassandraClient.isSuper( familyName )){ - this.cassandraClient.addSubColumn(key, columnName, columnName, schemaPos); - }else{ - this.cassandraClient.addColumn(key, columnName, schemaPos); - - } - //this.cassandraClient.getCassandraMapping().addColumn(familyName, columnName, columnName); - // adding union value - Schema unionSchema = schema.getTypes().get(schemaPos); - addOrUpdateField(key, field, unionSchema, value); - //this.cassandraClient.addColumn(key, field.name(), value); - } else { - LOG.warn("Setting content of: " + field.name() + " to null."); - if (this.cassandraClient.isSuper( familyName )){ - this.cassandraClient.deleteSubColumn(key, field.name()); - } else { - this.cassandraClient.deleteColumn(key, familyName, this.cassandraClient.toByteBuffer(field.name())); - } - } - break; - default: - LOG.warn("Type: " + type.name() + " not considered for field: " + field.name() + ". Please report this to d...@gora.apache.org"); - } - } - } - - /** - * Given an object and the object schema this function obtains, - * from within the UNION schema, the position of the type used. - * If no data type can be inferred then we return a default value - * of position 0. - * @param pValue - * @param pUnionSchema - * @return the unionSchemaPosition. - */ - private int getUnionSchema(Object pValue, Schema pUnionSchema){ - int unionSchemaPos = 0; -// String valueType = pValue.getClass().getSimpleName(); - for (Schema currentSchema : pUnionSchema.getTypes()) { - Type schemaType = currentSchema.getType(); - if (pValue instanceof CharSequence && schemaType.equals(Type.STRING)) - return unionSchemaPos; - else if (pValue instanceof ByteBuffer && schemaType.equals(Type.BYTES)) - return unionSchemaPos; - else if (pValue instanceof Integer && schemaType.equals(Type.INT)) - return unionSchemaPos; - else if (pValue instanceof Long && schemaType.equals(Type.LONG)) - return unionSchemaPos; - else if (pValue instanceof Double && schemaType.equals(Type.DOUBLE)) - return unionSchemaPos; - else if (pValue instanceof Float && schemaType.equals(Type.FLOAT)) - return unionSchemaPos; - else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN)) - return unionSchemaPos; - else if (pValue instanceof Map && schemaType.equals(Type.MAP)) - return unionSchemaPos; - else if (pValue instanceof List && schemaType.equals(Type.ARRAY)) - return unionSchemaPos; - else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD)) - return unionSchemaPos; - unionSchemaPos++; - } - // if we weren't able to determine which data type it is, then we return the default - return DEFAULT_UNION_SCHEMA; - } - - /** - * Simple method to check if a Cassandra Keyspace exists. - * @return true if a Keyspace exists. - */ - @Override - public boolean schemaExists() { - LOG.info("schema exists"); - return cassandraClient.keyspaceExists(); - } - -} http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java deleted file mode 100644 index 3f33202..0000000 --- a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gora.cassandra.store; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -import me.prettyprint.cassandra.serializers.ByteBufferSerializer; -import me.prettyprint.cassandra.serializers.IntegerSerializer; -import me.prettyprint.cassandra.serializers.StringSerializer; -import me.prettyprint.hector.api.beans.HColumn; -import me.prettyprint.hector.api.beans.HSuperColumn; -import me.prettyprint.hector.api.factory.HFactory; -import me.prettyprint.hector.api.mutation.Mutator; - -import org.apache.gora.persistency.Persistent; - -/** - * This class it not thread safe. - * According to Hector's JavaDoc a Mutator isn't thread safe, too. - * @see CassandraClient for safe usage. - */ -public class HectorUtils<K,T extends Persistent> { - - public static<K> void insertColumn(Mutator<K> mutator, K key, String columnFamily, ByteBuffer columnName, ByteBuffer columnValue, String ttlAttr) { - mutator.insert(key, columnFamily, createColumn(columnName, columnValue, ttlAttr)); - } - - public static<K> void insertColumn(Mutator<K> mutator, K key, String columnFamily, String columnName, ByteBuffer columnValue, String ttlAttr) { - mutator.insert(key, columnFamily, createColumn(columnName, columnValue, ttlAttr)); - } - - - public static<K> HColumn<ByteBuffer,ByteBuffer> createColumn(ByteBuffer name, ByteBuffer value, String ttlAttr) { - int ttl = Integer.parseInt(ttlAttr); - HColumn<ByteBuffer,ByteBuffer> col = HFactory.createColumn(name, value, ByteBufferSerializer.get(), ByteBufferSerializer.get()); - - if( 0 < ttl ) { - col.setTtl( ttl ); - } - - return col; - } - - public static<K> HColumn<String,ByteBuffer> createColumn(String name, ByteBuffer value, String ttlAttr) { - int ttl = Integer.parseInt(ttlAttr); - HColumn<String,ByteBuffer> col = HFactory.createColumn(name, value, StringSerializer.get(), ByteBufferSerializer.get()); - - if( 0 < ttl ) { - col.setTtl( ttl ); - } - - return col; - } - - public static<K> HColumn<Integer,ByteBuffer> createColumn(Integer name, ByteBuffer value, String ttlAttr) { - int ttl = Integer.parseInt(ttlAttr); - HColumn<Integer,ByteBuffer> col = HFactory.createColumn(name, value, IntegerSerializer.get(), ByteBufferSerializer.get()); - - if( 0 < ttl ) { - col.setTtl( ttl ); - } - - return col; - } - - - public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, ByteBuffer columnName, ByteBuffer columnValue, String ttlAttr) { - mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue, ttlAttr)); - } - - public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, String columnName, ByteBuffer columnValue, String ttlAttr) { - mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue, ttlAttr)); - } - - public static<K> void insertSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, Integer columnName, ByteBuffer columnValue, String ttlAttr) { - mutator.insert(key, columnFamily, createSuperColumn(superColumnName, columnName, columnValue, ttlAttr)); - } - - - public static<K> void deleteSubColumn(Mutator<K> mutator, K key, String columnFamily, String superColumnName, ByteBuffer columnName) { - mutator.subDelete(key, columnFamily, superColumnName, columnName, StringSerializer.get(), ByteBufferSerializer.get()); - } - - public static<K> void deleteColumn(Mutator<K> mutator, K key, String columnFamily, ByteBuffer columnName){ - mutator.delete(key, columnFamily, columnName, ByteBufferSerializer.get()); - } - - public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer> createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer columnValue, String ttlAttr) { - return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get()); - } - - public static<K> HSuperColumn<String,String,ByteBuffer> createSuperColumn(String superColumnName, String columnName, ByteBuffer columnValue, String ttlAttr) { - return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), StringSerializer.get(), StringSerializer.get(), ByteBufferSerializer.get()); - } - - public static<K> HSuperColumn<String,Integer,ByteBuffer> createSuperColumn(String superColumnName, Integer columnName, ByteBuffer columnValue, String ttlAttr) { - return HFactory.createSuperColumn(superColumnName, Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), StringSerializer.get(), IntegerSerializer.get(), ByteBufferSerializer.get()); - } - -}