Author: lewismc
Date: Wed Jan 15 19:03:28 2014
New Revision: 1558518

URL: http://svn.apache.org/r1558518
Log:
GORA-283 Specify field name for types not being considered in gora-cassandra

Modified:
    gora/branches/GORA_94/CHANGES.txt
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
    
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java

Modified: gora/branches/GORA_94/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/CHANGES.txt?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
--- gora/branches/GORA_94/CHANGES.txt (original)
+++ gora/branches/GORA_94/CHANGES.txt Wed Jan 15 19:03:28 2014
@@ -4,7 +4,7 @@
 
 Gora Change Log
 
-* GORA-94 Upgrade to Apache Avro 1.7.x  ==1st Attempt== (Ed Kohlwey via 
lewismc)
+* GORA-283 Specify field name for types not being considered in gora-cassandra 
(lewismc)
 
 * GORA-285 Change logging at o.a.g.mapreduce.GoraRecordWriter from INFO to 
WARN (lewismc)
 

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
 Wed Jan 15 19:03:28 2014
@@ -36,7 +36,7 @@ public abstract class CassandraColumn {
 
   public static final int SUB = 0;
   public static final int SUPER = 1;
-  
+
   private String family;
   private int type;
   private Field field;
@@ -49,7 +49,7 @@ public abstract class CassandraColumn {
   public int getUnionType(){
     return unionType;
   }
-  
+
   public String getFamily() {
     return family;
   }
@@ -65,19 +65,20 @@ public abstract class CassandraColumn {
   public void setField(Field field) {
     this.field = field;
   }
-  
+
   protected Field getField() {
     return this.field;
   }
-  
+
   public abstract ByteBuffer getName();
   public abstract Object getValue();
-  
+
   protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
     Object value = null;
     Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
     if (serializer == null) {
-      LOG.info("Schema is not supported: " + schema.toString());
+      LOG.warn("Schema: " + schema.getName() + " is not supported. No 
serializer "
+          + "could be found. Please report this to [email protected]");
     } else {
       value = serializer.fromByteBuffer(byteBuffer);
     }

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
 Wed Jan 15 19:03:28 2014
@@ -115,7 +115,7 @@ public class CassandraSuperColumn extend
         }
         break;
       default:
-        LOG.info("Type not supported: " + type);
+        LOG.warn("Type: " + type.name() + " not supported for field: " + 
field.name());
     }
     
     return value;

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
 Wed Jan 15 19:03:28 2014
@@ -320,7 +320,7 @@ public class CassandraClient<K, T extend
       byteBuffer = serializer.toByteBuffer(value);
     }
     if (byteBuffer == null) {
-      LOG.info("value class=" + value.getClass().getName() + " value=" + value 
+ " -> null");
+      LOG.warn("Serialization value for: " + value.getClass().getName() + " = 
null");
     }
     return byteBuffer;
   }

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
 Wed Jan 15 19:03:28 2014
@@ -64,18 +64,18 @@ import org.slf4j.LoggerFactory;
  * such as initialization, creating and deleting schemas (Cassandra 
Keyspaces), etc.  
  */
 public class CassandraStore<K, T extends PersistentBase> extends 
DataStoreBase<K, T> {
-  
+
   /** Logging implementation */
   public static final Logger LOG = 
LoggerFactory.getLogger(CassandraStore.class);
 
   private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
 
- /**
-  * Fixed string used to generate an extra column based on 
-  * the original field's name
-  */
+  /**
+   * Fixed string used to generate an extra column based on 
+   * the original field's name
+   */
   public static String UNION_COL_SUFIX = "UnionIndex";
-  
+
   /**
    * Default schema index used when AVRO Union data types are stored
    */
@@ -89,12 +89,12 @@ public class CassandraStore<K, T extends
    * since in the meantime other threads are adding entries to the map.
    */
   private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K, 
T>());
-  
+
   /** The default constructor for CassandraStore */
   public CassandraStore() throws Exception {
     // this.cassandraClient.initialize();
   }
- 
+
   /** 
    * Initialize is called when then the call to 
    * {@link org.apache.gora.store.DataStoreFactory#createDataStore(Class<D> 
dataStoreClass, Class<K> keyClass, Class<T> persistent, 
org.apache.hadoop.conf.Configuration conf)}
@@ -149,19 +149,19 @@ public class CassandraStore<K, T extends
    */
   @Override
   public Result<K, T> execute(Query<K, T> query) {
-    
+
     Map<String, List<String>> familyMap = 
this.cassandraClient.getFamilyMap(query);
     Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
-    
+
     CassandraQuery<K, T> cassandraQuery = new CassandraQuery<K, T>();
     cassandraQuery.setQuery(query);
     cassandraQuery.setFamilyMap(familyMap);
-    
+
     CassandraResult<K, T> cassandraResult = new CassandraResult<K, T>(this, 
query);
     cassandraResult.setReverseMap(reverseMap);
 
     CassandraResultSet<K> cassandraResultSet = new CassandraResultSet<K>();
-    
+
     // We query Cassandra keyspace by families.
     for (String family : familyMap.keySet()) {
       if (family == null) {
@@ -169,17 +169,17 @@ public class CassandraStore<K, T extends
       }
       if (this.cassandraClient.isSuper(family)) {
         addSuperColumns(family, cassandraQuery, cassandraResultSet);
-         
+
       } else {
         addSubColumns(family, cassandraQuery, cassandraResultSet);
       }
     }
-    
+
     cassandraResult.setResultSet(cassandraResultSet);
-    
+
     return cassandraResult;
   }
-  
+
   /**
    * When we add subcolumns, Gora keys are mapped to Cassandra partition keys 
only. 
    * This is because we follow the Cassandra logic where column family data is 
@@ -189,10 +189,10 @@ public class CassandraStore<K, T extends
       CassandraResultSet cassandraResultSet) {
     // select family columns that are included in the query
     List<Row<K, ByteBuffer, ByteBuffer>> rows = 
this.cassandraClient.execute(cassandraQuery, family);
-    
+
     for (Row<K, ByteBuffer, ByteBuffer> row : rows) {
       K key = row.getKey();
-      
+
       // find associated row in the resultset
       CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
       if (cassandraRow == null) {
@@ -200,16 +200,16 @@ public class CassandraStore<K, T extends
         cassandraResultSet.putRow(key, cassandraRow);
         cassandraRow.setKey(key);
       }
-      
+
       ColumnSlice<ByteBuffer, ByteBuffer> columnSlice = row.getColumnSlice();
-      
+
       for (HColumn<ByteBuffer, ByteBuffer> hColumn : columnSlice.getColumns()) 
{
         CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
         cassandraSubColumn.setValue(hColumn);
         cassandraSubColumn.setFamily(family);
         cassandraRow.add(cassandraSubColumn);
       }
-      
+
     }
   }
 
@@ -220,7 +220,7 @@ public class CassandraStore<K, T extends
    */
   private void addSuperColumns(String family, CassandraQuery<K, T> 
cassandraQuery, 
       CassandraResultSet cassandraResultSet) {
-    
+
     List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows = 
this.cassandraClient.executeSuper(cassandraQuery, family);
     for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
       K key = superRow.getKey();
@@ -230,7 +230,7 @@ public class CassandraStore<K, T extends
         cassandraResultSet.putRow(key, cassandraRow);
         cassandraRow.setKey(key);
       }
-      
+
       SuperSlice<String, ByteBuffer, ByteBuffer> superSlice = 
superRow.getSuperSlice();
       for (HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn: 
superSlice.getSuperColumns()) {
         CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
@@ -251,30 +251,30 @@ public class CassandraStore<K, T extends
    */
   @Override
   public void flush() {
-    
+
     Set<K> keys = this.buffer.keySet();
-    
+
     // this duplicates memory footprint
     @SuppressWarnings("unchecked")
     K[] keyArray = (K[]) keys.toArray();
-    
+
     // iterating over the key set directly would throw 
     //ConcurrentModificationException with java.util.HashMap and subclasses
     for (K key: keyArray) {
       T value = this.buffer.get(key);
       if (value == null) {
-        LOG.info("Value to update is null for key " + key);
+        LOG.info("Value to update is null for key: " + key);
         continue;
       }
       Schema schema = value.getSchema();
-      
+
       for (Field field: schema.getFields()) {
         if (value.isDirty(field.pos())) {
           addOrUpdateField(key, field, value.get(field.pos()));
         }
       }
     }
-    
+
     // remove flushed rows from the buffer as all 
     // added or updated fields should now have been written.
     for (K key: keyArray) {
@@ -312,7 +312,7 @@ public class CassandraStore<K, T extends
     partitions.add(pqi);
     return partitions;
   }
-  
+
   /**
    * In Cassandra Schemas are referred to as Keyspaces
    * @return Keyspace
@@ -373,33 +373,33 @@ public class CassandraStore<K, T extends
       Type type = field.schema().getType();
       Object fieldValue = value.get(field.pos());
       // check if field has a nested structure (array, map, record or union)
-        
+
       switch(type) {
-        case RECORD:
-          Persistent persistent = (Persistent) fieldValue;
-          Persistent newRecord = (Persistent) 
SpecificData.get().newRecord(persistent, persistent.getSchema());
-          for (Field member: field.schema().getFields()) {
-            if (member.pos() == 0 || !persistent.isDirty()) {
-              continue;
-            }
-            newRecord.put(member.pos(), persistent.get(member.pos()));
+      case RECORD:
+        Persistent persistent = (Persistent) fieldValue;
+        Persistent newRecord = (Persistent) 
SpecificData.get().newRecord(persistent, persistent.getSchema());
+        for (Field member: field.schema().getFields()) {
+          if (member.pos() == 0 || !persistent.isDirty()) {
+            continue;
           }
-          fieldValue = newRecord;
-          break;
-        case MAP:
-          Map<?, ?> map = (Map<?, ?>) fieldValue;
-          fieldValue = map;
-          break;
-        case ARRAY:
-          fieldValue = (List<?>) fieldValue;
-          break;
-        case UNION:
-          // storing the union selected schema, the actual value will 
-          // be stored as soon as we get break out.
-          int schemaPos = getUnionSchema(fieldValue,field.schema());
-          p.put( schemaPos, p.getSchema().getField(field.name() + 
CassandraStore.UNION_COL_SUFIX));
-          //p.put(fieldPos, fieldValue);
-          break;
+          newRecord.put(member.pos(), persistent.get(member.pos()));
+        }
+        fieldValue = newRecord;
+        break;
+      case MAP:
+        Map<?, ?> map = (Map<?, ?>) fieldValue;
+        fieldValue = map;
+        break;
+      case ARRAY:
+        fieldValue = (List<?>) fieldValue;
+        break;
+      case UNION:
+        // storing the union selected schema, the actual value will 
+        // be stored as soon as we get break out.
+        int schemaPos = getUnionSchema(fieldValue,field.schema());
+        p.put( schemaPos, p.getSchema().getField(field.name() + 
CassandraStore.UNION_COL_SUFIX));
+        //p.put(fieldPos, fieldValue);
+        break;
       default:
         break;
       }
@@ -422,38 +422,38 @@ public class CassandraStore<K, T extends
     // checking if the value to be updated is used for saving union schema
     if (field.name().indexOf(CassandraStore.UNION_COL_SUFIX) < 0){
       switch (type) {
-        case STRING:
-        case BOOLEAN:
-        case INT:
-        case LONG:
-        case BYTES:
-        case FLOAT:
-        case DOUBLE:
-        case FIXED:
-          this.cassandraClient.addColumn(key, field.name(), value);
-          break;
-        case RECORD:
-          if (value != null) {
-            if (value instanceof PersistentBase) {
-              PersistentBase persistentBase = (PersistentBase) value;
-              for (Field member: schema.getFields()) {
-                
-                // TODO: hack, do not store empty arrays
-                Object memberValue = persistentBase.get(member.pos());
-                if (memberValue instanceof List<?>) {
-                  if (((List<?>)memberValue).size() == 0) {
-                    continue;
-                  }
-                } else if (memberValue instanceof Map<?,?>) {
-                  if (((Map<?, ?>)memberValue).size() == 0) {
-                    continue;
-                  }
+      case STRING:
+      case BOOLEAN:
+      case INT:
+      case LONG:
+      case BYTES:
+      case FLOAT:
+      case DOUBLE:
+      case FIXED:
+        this.cassandraClient.addColumn(key, field.name(), value);
+        break;
+      case RECORD:
+        if (value != null) {
+          if (value instanceof PersistentBase) {
+            PersistentBase persistentBase = (PersistentBase) value;
+            for (Field member: schema.getFields()) {
+
+              // TODO: hack, do not store empty arrays
+              Object memberValue = persistentBase.get(member.pos());
+              if (memberValue instanceof List<?>) {
+                if (((List<?>)memberValue).size() == 0) {
+                  continue;
+                }
+              } else if (memberValue instanceof Map<?,?>) {
+                if (((Map<?, ?>)memberValue).size() == 0) {
+                  continue;
                 }
-                this.cassandraClient.addSubColumn(key, field.name(), 
-                    member.name(), memberValue);
               }
+              this.cassandraClient.addSubColumn(key, field.name(), 
+                  member.name(), memberValue);
+            }
           } else {
-            LOG.info("Record not supported: " + value.toString());
+            LOG.warn("Record with value: " + value.toString() + " not 
supported for field: " + field.name());
           }
         }
         break;
@@ -462,7 +462,7 @@ public class CassandraStore<K, T extends
           if (value instanceof Map<?, ?>) {
             this.cassandraClient.addStatefulHashMap(key, field.name(), 
(Map<CharSequence,Object>)value);
           } else {
-            LOG.info("Map not supported: " + value.toString());
+            LOG.warn("Map with value: " + value.toString() + " not supported 
for field: " + field.name());
           }
         }
         break;
@@ -471,13 +471,13 @@ public class CassandraStore<K, T extends
           if (value instanceof GenericArray<?>) {
             this.cassandraClient.addGenericArray(key, field.name(), 
(GenericArray<?>)value);
           } else {
-            LOG.info("Array not supported: " + value.toString());
+            LOG.warn("Array with value: " + value.toString() + " not supported 
for field: " + field.name());
           }
         }
         break;
       case UNION:
         if(value != null) {
-          LOG.info("Union being supported: " + value.toString());
+          LOG.debug("Union with value: " + value.toString() + " at index: " + 
getUnionSchema(value, schema) + " supported for field: " + field.name());
           // adding union schema index
           String columnName = field.name() + UNION_COL_SUFIX;
           String familyName = 
this.cassandraClient.getCassandraMapping().getFamily(field.name());
@@ -486,11 +486,12 @@ public class CassandraStore<K, T extends
           // adding union value
           this.cassandraClient.addColumn(key, field.name(), value);
         } else {
-          LOG.info("Union not supported: " + value.toString());
+          LOG.warn("Union with value: " + value.toString() + " at index: " + 
getUnionSchema(value, schema) + " not supported for field: " + field.name());
         }
         break;
       default:
-        LOG.info("Type not considered: " + type.name());
+        LOG.warn("Type: " + type.name() + " with value: " + value.toString() + 
+            " not considered for field: " + field.name() + ". Please report 
this to [email protected]");
       }
     }
   }

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
 Wed Jan 15 19:03:28 2014
@@ -28,13 +28,9 @@ import me.prettyprint.hector.api.beans.H
 import me.prettyprint.hector.api.beans.HSuperColumn;
 import me.prettyprint.hector.api.factory.HFactory;
 import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.Serializer;
 
 import org.apache.gora.persistency.Persistent;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * This class it not thread safe.
  * According to Hector's JavaDoc a Mutator isn't thread safe, too.
@@ -42,8 +38,6 @@ import org.slf4j.LoggerFactory;
  */
 public class HectorUtils<K,T extends Persistent> {
 
-  public static final Logger LOG = LoggerFactory.getLogger(HectorUtils.class);
-  
   public static<K> void insertColumn(Mutator<K> mutator, K key, String 
columnFamily, ByteBuffer columnName, ByteBuffer columnValue) {
     mutator.insert(key, columnFamily, createColumn(columnName, columnValue));
   }
@@ -84,14 +78,17 @@ public class HectorUtils<K,T extends Per
   }
 
 
+  @SuppressWarnings("unchecked")
   public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer> 
createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer 
columnValue) {
     return HFactory.createSuperColumn(superColumnName, 
Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), 
ByteBufferSerializer.get(), ByteBufferSerializer.get());
   }
 
+  @SuppressWarnings("unchecked")
   public static<K> HSuperColumn<String,String,ByteBuffer> 
createSuperColumn(String superColumnName, String columnName, ByteBuffer 
columnValue) {
     return HFactory.createSuperColumn(superColumnName, 
Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), 
StringSerializer.get(), ByteBufferSerializer.get());
   }
 
+  @SuppressWarnings("unchecked")
   public static<K> HSuperColumn<String,Integer,ByteBuffer> 
createSuperColumn(String superColumnName, Integer columnName, ByteBuffer 
columnValue) {
     return HFactory.createSuperColumn(superColumnName, 
Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(), 
IntegerSerializer.get(), ByteBufferSerializer.get());
   }

Modified: 
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java?rev=1558518&r1=1558517&r2=1558518&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
 (original)
+++ 
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/mapreduce/GoraRecordWriter.java
 Wed Jan 15 19:03:28 2014
@@ -69,7 +69,7 @@ public class GoraRecordWriter<K, T> exte
         store.flush();
       }
     }catch(Exception e){
-      LOG.warn("Exception at GoraRecordWriter.class while writing to 
datastore." + e.getMessage());
+      LOG.warn("Exception at GoraRecordWriter.class while writing to 
datastore. " + e.getMessage());
     }
   }
 }


Reply via email to