Author: lewismc
Date: Wed Jan 15 19:03:28 2014
New Revision: 1558518
URL: http://svn.apache.org/r1558518
Log:
GORA-283 Specify field name for types not being considered in gora-cassandra
Modified:
gora/branches/GORA_94/CHANGES.txt
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
Modified: gora/branches/GORA_94/CHANGES.txt
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/CHANGES.txt?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
--- gora/branches/GORA_94/CHANGES.txt (original)
+++ gora/branches/GORA_94/CHANGES.txt Wed Jan 15 19:03:28 2014
@@ -4,7 +4,7 @@
Gora Change Log
-* GORA-94 Upgrade to Apache Avro 1.7.x ==1st Attempt== (Ed Kohlwey via
lewismc)
+* GORA-283 Specify field name for types not being considered in gora-cassandra
(lewismc)
* GORA-285 Change logging at o.a.g.mapreduce.GoraRecordWriter from INFO to
WARN (lewismc)
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
Wed Jan 15 19:03:28 2014
@@ -36,7 +36,7 @@ public abstract class CassandraColumn {
public static final int SUB = 0;
public static final int SUPER = 1;
-
+
private String family;
private int type;
private Field field;
@@ -49,7 +49,7 @@ public abstract class CassandraColumn {
public int getUnionType(){
return unionType;
}
-
+
public String getFamily() {
return family;
}
@@ -65,19 +65,20 @@ public abstract class CassandraColumn {
public void setField(Field field) {
this.field = field;
}
-
+
protected Field getField() {
return this.field;
}
-
+
public abstract ByteBuffer getName();
public abstract Object getValue();
-
+
protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
Object value = null;
Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
if (serializer == null) {
- LOG.info("Schema is not supported: " + schema.toString());
+ LOG.warn("Schema: " + schema.getName() + " is not supported. No
serializer "
+ + "could be found. Please report this to [email protected]");
} else {
value = serializer.fromByteBuffer(byteBuffer);
}
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
Wed Jan 15 19:03:28 2014
@@ -115,7 +115,7 @@ public class CassandraSuperColumn extend
}
break;
default:
- LOG.info("Type not supported: " + type);
+ LOG.warn("Type: " + type.name() + " not supported for field: " +
field.name());
}
return value;
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
Wed Jan 15 19:03:28 2014
@@ -320,7 +320,7 @@ public class CassandraClient<K, T extend
byteBuffer = serializer.toByteBuffer(value);
}
if (byteBuffer == null) {
- LOG.info("value class=" + value.getClass().getName() + " value=" + value
+ " -> null");
+ LOG.warn("Serialization value for: " + value.getClass().getName() + " =
null");
}
return byteBuffer;
}
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
Wed Jan 15 19:03:28 2014
@@ -64,18 +64,18 @@ import org.slf4j.LoggerFactory;
* such as initialization, creating and deleting schemas (Cassandra
Keyspaces), etc.
*/
public class CassandraStore<K, T extends PersistentBase> extends
DataStoreBase<K, T> {
-
+
/** Logging implementation */
public static final Logger LOG =
LoggerFactory.getLogger(CassandraStore.class);
private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
- /**
- * Fixed string used to generate an extra column based on
- * the original field's name
- */
+ /**
+ * Fixed string used to generate an extra column based on
+ * the original field's name
+ */
public static String UNION_COL_SUFIX = "UnionIndex";
-
+
/**
* Default schema index used when AVRO Union data types are stored
*/
@@ -89,12 +89,12 @@ public class CassandraStore<K, T extends
* since in the meantime other threads are adding entries to the map.
*/
private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K,
T>());
-
+
/** The default constructor for CassandraStore */
public CassandraStore() throws Exception {
// this.cassandraClient.initialize();
}
-
+
/**
* Initialize is called when then the call to
* {@link org.apache.gora.store.DataStoreFactory#createDataStore(Class<D>
dataStoreClass, Class<K> keyClass, Class<T> persistent,
org.apache.hadoop.conf.Configuration conf)}
@@ -149,19 +149,19 @@ public class CassandraStore<K, T extends
*/
@Override
public Result<K, T> execute(Query<K, T> query) {
-
+
Map<String, List<String>> familyMap =
this.cassandraClient.getFamilyMap(query);
Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
-
+
CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
cassandraQuery.setQuery(query);
cassandraQuery.setFamilyMap(familyMap);
-
+
CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this,
query);
cassandraResult.setReverseMap(reverseMap);
CassandraResultSet<K> cassandraResultSet = new CassandraResultSet<K>();
-
+
// We query Cassandra keyspace by families.
for (String family : familyMap.keySet()) {
if (family == null) {
@@ -169,17 +169,17 @@ public class CassandraStore<K, T extends
}
if (this.cassandraClient.isSuper(family)) {
addSuperColumns(family, cassandraQuery, cassandraResultSet);
-
+
} else {
addSubColumns(family, cassandraQuery, cassandraResultSet);
}
}
-
+
cassandraResult.setResultSet(cassandraResultSet);
-
+
return cassandraResult;
}
-
+
/**
* When we add subcolumns, Gora keys are mapped to Cassandra partition keys
only.
* This is because we follow the Cassandra logic where column family data is
@@ -189,10 +189,10 @@ public class CassandraStore<K, T extends
CassandraResultSet cassandraResultSet) {
// select family columns that are included in the query
List<Row<K, ByteBuffer, ByteBuffer>> rows =
this.cassandraClient.execute(cassandraQuery, family);
-
+
for (Row<K, ByteBuffer, ByteBuffer> row : rows) {
K key = row.getKey();
-
+
// find associated row in the resultset
CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
if (cassandraRow == null) {
@@ -200,16 +200,16 @@ public class CassandraStore<K, T extends
cassandraResultSet.putRow(key, cassandraRow);
cassandraRow.setKey(key);
}
-
+
ColumnSlice<ByteBuffer, ByteBuffer> columnSlice = row.getColumnSlice();
-
+
for (HColumn<ByteBuffer, ByteBuffer> hColumn : columnSlice.getColumns())
{
CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
cassandraSubColumn.setValue(hColumn);
cassandraSubColumn.setFamily(family);
cassandraRow.add(cassandraSubColumn);
}
-
+
}
}
@@ -220,7 +220,7 @@ public class CassandraStore<K, T extends
*/
private void addSuperColumns(String family, CassandraQuery<K, T>
cassandraQuery,
CassandraResultSet cassandraResultSet) {
-
+
List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows =
this.cassandraClient.executeSuper(cassandraQuery, family);
for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
K key = superRow.getKey();
@@ -230,7 +230,7 @@ public class CassandraStore<K, T extends
cassandraResultSet.putRow(key, cassandraRow);
cassandraRow.setKey(key);
}
-
+
SuperSlice<String, ByteBuffer, ByteBuffer> superSlice =
superRow.getSuperSlice();
for (HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn:
superSlice.getSuperColumns()) {
CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
@@ -251,30 +251,30 @@ public class CassandraStore<K, T extends
*/
@Override
public void flush() {
-
+
Set<K> keys = this.buffer.keySet();
-
+
// this duplicates memory footprint
@SuppressWarnings("unchecked")
K[] keyArray = (K[]) keys.toArray();
-
+
// iterating over the key set directly would throw
//ConcurrentModificationException with java.util.HashMap and subclasses
for (K key: keyArray) {
T value = this.buffer.get(key);
if (value == null) {
- LOG.info("Value to update is null for key " + key);
+ LOG.info("Value to update is null for key: " + key);
continue;
}
Schema schema = value.getSchema();
-
+
for (Field field: schema.getFields()) {
if (value.isDirty(field.pos())) {
addOrUpdateField(key, field, value.get(field.pos()));
}
}
}
-
+
// remove flushed rows from the buffer as all
// added or updated fields should now have been written.
for (K key: keyArray) {
@@ -312,7 +312,7 @@ public class CassandraStore<K, T extends
partitions.add(pqi);
return partitions;
}
-
+
/**
* In Cassandra Schemas are referred to as Keyspaces
* @return Keyspace
@@ -373,33 +373,33 @@ public class CassandraStore<K, T extends
Type type = field.schema().getType();
Object fieldValue = value.get(field.pos());
// check if field has a nested structure (array, map, record or union)
-
+
switch(type) {
- case RECORD:
- Persistent persistent = (Persistent) fieldValue;
- Persistent newRecord = (Persistent)
SpecificData.get().newRecord(persistent, persistent.getSchema());
- for (Field member: field.schema().getFields()) {
- if (member.pos() == 0 || !persistent.isDirty()) {
- continue;
- }
- newRecord.put(member.pos(), persistent.get(member.pos()));
+ case RECORD:
+ Persistent persistent = (Persistent) fieldValue;
+ Persistent newRecord = (Persistent)
SpecificData.get().newRecord(persistent, persistent.getSchema());
+ for (Field member: field.schema().getFields()) {
+ if (member.pos() == 0 || !persistent.isDirty()) {
+ continue;
}
- fieldValue = newRecord;
- break;
- case MAP:
- Map<?, ?> map = (Map<?, ?>) fieldValue;
- fieldValue = map;
- break;
- case ARRAY:
- fieldValue = (List<?>) fieldValue;
- break;
- case UNION:
- // storing the union selected schema, the actual value will
- // be stored as soon as we get break out.
- int schemaPos = getUnionSchema(fieldValue,field.schema());
- p.put( schemaPos, p.getSchema().getField(field.name() +
CassandraStore.UNION_COL_SUFIX));
- //p.put(fieldPos, fieldValue);
- break;
+ newRecord.put(member.pos(), persistent.get(member.pos()));
+ }
+ fieldValue = newRecord;
+ break;
+ case MAP:
+ Map<?, ?> map = (Map<?, ?>) fieldValue;
+ fieldValue = map;
+ break;
+ case ARRAY:
+ fieldValue = (List<?>) fieldValue;
+ break;
+ case UNION:
+ // storing the union selected schema, the actual value will
+ // be stored as soon as we get break out.
+ int schemaPos = getUnionSchema(fieldValue,field.schema());
+ p.put( schemaPos, p.getSchema().getField(field.name() +
CassandraStore.UNION_COL_SUFIX));
+ //p.put(fieldPos, fieldValue);
+ break;
default:
break;
}
@@ -422,38 +422,38 @@ public class CassandraStore<K, T extends
// checking if the value to be updated is used for saving union schema
if (field.name().indexOf(CassandraStore.UNION_COL_SUFIX) < 0){
switch (type) {
- case STRING:
- case BOOLEAN:
- case INT:
- case LONG:
- case BYTES:
- case FLOAT:
- case DOUBLE:
- case FIXED:
- this.cassandraClient.addColumn(key, field.name(), value);
- break;
- case RECORD:
- if (value != null) {
- if (value instanceof PersistentBase) {
- PersistentBase persistentBase = (PersistentBase) value;
- for (Field member: schema.getFields()) {
-
- // TODO: hack, do not store empty arrays
- Object memberValue = persistentBase.get(member.pos());
- if (memberValue instanceof List<?>) {
- if (((List<?>)memberValue).size() == 0) {
- continue;
- }
- } else if (memberValue instanceof Map<?,?>) {
- if (((Map<?, ?>)memberValue).size() == 0) {
- continue;
- }
+ case STRING:
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case BYTES:
+ case FLOAT:
+ case DOUBLE:
+ case FIXED:
+ this.cassandraClient.addColumn(key, field.name(), value);
+ break;
+ case RECORD:
+ if (value != null) {
+ if (value instanceof PersistentBase) {
+ PersistentBase persistentBase = (PersistentBase) value;
+ for (Field member: schema.getFields()) {
+
+ // TODO: hack, do not store empty arrays
+ Object memberValue = persistentBase.get(member.pos());
+ if (memberValue instanceof List<?>) {
+ if (((List<?>)memberValue).size() == 0) {
+ continue;
+ }
+ } else if (memberValue instanceof Map<?,?>) {
+ if (((Map<?, ?>)memberValue).size() == 0) {
+ continue;
}
- this.cassandraClient.addSubColumn(key, field.name(),
- member.name(), memberValue);
}
+ this.cassandraClient.addSubColumn(key, field.name(),
+ member.name(), memberValue);
+ }
} else {
- LOG.info("Record not supported: " + value.toString());
+ LOG.warn("Record with value: " + value.toString() + " not
supported for field: " + field.name());
}
}
break;
@@ -462,7 +462,7 @@ public class CassandraStore<K, T extends
if (value instanceof Map<?, ?>) {
this.cassandraClient.addStatefulHashMap(key, field.name(),
(Map<CharSequence,Object>)value);
} else {
- LOG.info("Map not supported: " + value.toString());
+ LOG.warn("Map with value: " + value.toString() + " not supported
for field: " + field.name());
}
}
break;
@@ -471,13 +471,13 @@ public class CassandraStore<K, T extends
if (value instanceof GenericArray<?>) {
this.cassandraClient.addGenericArray(key, field.name(),
(GenericArray<?>)value);
} else {
- LOG.info("Array not supported: " + value.toString());
+ LOG.warn("Array with value: " + value.toString() + " not supported
for field: " + field.name());
}
}
break;
case UNION:
if(value != null) {
- LOG.info("Union being supported: " + value.toString());
+ LOG.debug("Union with value: " + value.toString() + " at index: " +
getUnionSchema(value, schema) + " supported for field: " + field.name());
// adding union schema index
String columnName = field.name() + UNION_COL_SUFIX;
String familyName =
this.cassandraClient.getCassandraMapping().getFamily(field.name());
@@ -486,11 +486,12 @@ public class CassandraStore<K, T extends
// adding union value
this.cassandraClient.addColumn(key, field.name(), value);
} else {
- LOG.info("Union not supported: " + value.toString());
+ LOG.warn("Union with value: " + value.toString() + " at index: " +
getUnionSchema(value, schema) + " not supported for field: " + field.name());
}
break;
default:
- LOG.info("Type not considered: " + type.name());
+ LOG.warn("Type: " + type.name() + " with value: " + value.toString() +
+ " not considered for field: " + field.name() + ". Please report
this to [email protected]");
}
}
}
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
Wed Jan 15 19:03:28 2014
@@ -28,13 +28,9 @@ import me.prettyprint.hector.api.beans.H
import me.prettyprint.hector.api.beans.HSuperColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.Serializer;
import org.apache.gora.persistency.Persistent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* This class it not thread safe.
* According to Hector's JavaDoc a Mutator isn't thread safe, too.
@@ -42,8 +38,6 @@ import org.slf4j.LoggerFactory;
*/
public class HectorUtils<K,T extends Persistent> {
- public static final Logger LOG = LoggerFactory.getLogger(HectorUtils.class);
-
public static<K> void insertColumn(Mutator<K> mutator, K key, String
columnFamily, ByteBuffer columnName, ByteBuffer columnValue) {
mutator.insert(key, columnFamily, createColumn(columnName, columnValue));
}
@@ -84,14 +78,17 @@ public class HectorUtils<K,T extends Per
}
+ @SuppressWarnings("unchecked")
public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer>
createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer
columnValue) {
return HFactory.createSuperColumn(superColumnName,
Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(),
ByteBufferSerializer.get(), ByteBufferSerializer.get());
}
+ @SuppressWarnings("unchecked")
public static<K> HSuperColumn<String,String,ByteBuffer>
createSuperColumn(String superColumnName, String columnName, ByteBuffer
columnValue) {
return HFactory.createSuperColumn(superColumnName,
Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(),
StringSerializer.get(), ByteBufferSerializer.get());
}
+ @SuppressWarnings("unchecked")
public static<K> HSuperColumn<String,Integer,ByteBuffer>
createSuperColumn(String superColumnName, Integer columnName, ByteBuffer
columnValue) {
return HFactory.createSuperColumn(superColumnName,
Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(),
IntegerSerializer.get(), ByteBufferSerializer.get());
}
Modified:
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
(original)
+++
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
Wed Jan 15 19:03:28 2014
@@ -69,7 +69,7 @@ public class GoraRecordWriter<K, T> exte
store.flush();
}
}catch(Exception e){
- LOG.warn("Exception at GoraRecordWriter.class while writing to
datastore." + e.getMessage());
+ LOG.warn("Exception at GoraRecordWriter.class while writing to
datastore. " + e.getMessage());
}
}
}