Author: lewismc
Date: Thu Feb 13 20:19:22 2014
New Revision: 1568028

URL: http://svn.apache.org/r1568028
Log:
GORA-245v5

Modified:
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
    
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
    
gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
    
gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
 Thu Feb 13 20:19:22 2014
@@ -18,12 +18,18 @@
 
 package org.apache.gora.cassandra.query;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
 
 import me.prettyprint.hector.api.Serializer;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +47,20 @@ public abstract class CassandraColumn {
   private int type;
   private Field field;
   private int unionType;
-
+  
+  public static final ThreadLocal<BinaryDecoder> decoders =
+      new ThreadLocal<BinaryDecoder>();
+
+  /*
+   * Create a threadlocal map for the datum readers and writers, because
+   * they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
+   * When they are thread safe, it is possible to maintain a single reader and
+   * writer pair for every schema, instead of one for every thread.
+   */
+  
+  public static final ConcurrentHashMap<String, SpecificDatumReader<?>> 
readerMap = 
+      new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+  
   public void setUnionType(int pUnionType){
     this.unionType = pUnionType;
   }
@@ -72,7 +91,8 @@ public abstract class CassandraColumn {
 
   public abstract ByteBuffer getName();
   public abstract Object getValue();
-
+  
+  @SuppressWarnings({ "rawtypes" })
   protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
     Object value = null;
     Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
@@ -81,6 +101,32 @@ public abstract class CassandraColumn {
           + "could be found. Please report this to [email protected]");
     } else {
       value = serializer.fromByteBuffer(byteBuffer);
+      if (schema.getType().equals(Type.RECORD)){
+        String schemaId = schema.getFullName();      
+        
+        SpecificDatumReader<?> reader = 
(SpecificDatumReader<?>)readerMap.get(schemaId);
+        if (reader == null) {
+          reader = new SpecificDatumReader(schema);// ignore dirty bits
+          SpecificDatumReader localReader=null;
+          if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
+            reader = localReader;
+          }
+        }
+        
+        // initialize a decoder, possibly reusing previous one
+        BinaryDecoder decoderFromCache = decoders.get();
+        BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder((byte[])value, null);
+        // put in threadlocal cache if the initial get was empty
+        if (decoderFromCache==null) {
+          decoders.set(decoder);
+        }
+        try {
+          value = reader.read(null, decoder);
+        } catch (IOException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }        
+      }
     }
     return value;
   }

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
 Thu Feb 13 20:19:22 2014
@@ -72,7 +72,7 @@ public class CassandraResult<K, T extend
     
     for (int iCnt = 0; iCnt < pCassandraRow.length; iCnt++){
       CassandraColumn cColumn = (CassandraColumn)pCassandraRow[iCnt];
-      String columnName = 
StringSerializer.get().fromByteBuffer(cColumn.getName());
+      String columnName = 
StringSerializer.get().fromByteBuffer(cColumn.getName().duplicate());
       if (pFieldName.equals(columnName))
         return cColumn;
     }
@@ -95,46 +95,50 @@ public class CassandraResult<K, T extend
     List<Field> fields = schema.getFields();
     
     for (CassandraColumn cassandraColumn: cassandraRow) {
-      
       // get field name
-      String family = cassandraColumn.getFamily();
-      String fieldName = this.reverseMap.get(family + ":" + 
-          StringSerializer.get().fromByteBuffer(cassandraColumn.getName()));
+      String family = cassandraColumn.getFamily();  
+      
+      String fieldName = this.reverseMap.get(family + ":" + 
StringSerializer.get().fromByteBuffer(cassandraColumn.getName().duplicate()));
       
-      if (fieldName != null ){
+      if (fieldName != null) {
         // get field
-        int pos = this.persistent.getSchema().getField(fieldName).pos();
-        Field field = fields.get(pos);
-        Type fieldType = field.schema().getType();
-        
//LOG.info(StringSerializer.get().fromByteBuffer(cassandraColumn.getName()) + 
fieldName + " " + fieldType.name());
-        if (fieldType == Type.UNION){
-          // getting UNION stored type
-          int posUnionType = this.persistent.getSchema().getField(fieldName + 
CassandraStore.UNION_COL_SUFIX).pos();
-          Field fieldUnionType = fields.get(posUnionType);
-          CassandraColumn cc = getUnionTypeColumn(fieldName + 
CassandraStore.UNION_COL_SUFIX, cassandraRow.toArray());
-          // get value of UNION stored type
-          cc.setField(fieldUnionType);
-          Object val = cc.getValue();
-          this.persistent.put(posUnionType, val);
+        if (fieldName.indexOf(CassandraStore.UNION_COL_SUFIX) < 0) {
+
+          int pos = this.persistent.getSchema().getField(fieldName).pos();
+          Field field = fields.get(pos);
+          Type fieldType = field.schema().getType();
+          // 
LOG.info(StringSerializer.get().fromByteBuffer(cassandraColumn.getName())
+          // + fieldName + " " + fieldType.name());
+          if (fieldType.equals(Type.UNION)) {
+            //getting UNION stored type
+            CassandraColumn cc = getUnionTypeColumn(fieldName
+                + CassandraStore.UNION_COL_SUFIX, cassandraRow.toArray());
+            //creating temporary UNION Field
+            Field unionField = new Field(fieldName
+                + CassandraStore.UNION_COL_SUFIX, Schema.create(Type.INT),
+                null, null);
+            // get value of UNION stored type
+            cc.setField(unionField);
+            Object val = cc.getValue();
+            cassandraColumn.setUnionType(Integer.parseInt(val.toString()));
+          }
+
+          // get value
+          cassandraColumn.setField(field);
+          Object value = cassandraColumn.getValue();
+
+          this.persistent.put(pos, value);
           // this field does not need to be written back to the store
-          this.persistent.clearDirty(posUnionType);
-          cassandraColumn.setUnionType(Integer.parseInt(val.toString()));
+          this.persistent.clearDirty(pos);
         }
-
-        // get value
-        cassandraColumn.setField(field);
-        Object value = cassandraColumn.getValue();
-
-        this.persistent.put(pos, value);
-        // this field does not need to be written back to the store
-        this.persistent.clearDirty(pos);
-      }
-      else
+      } else
         LOG.debug("FieldName was null while iterating CassandraRow and using 
Avro Union type");
     }
 
   }
 
+  //TODO Should we remove this method?
+  @SuppressWarnings("unused")
   private int getNonNullTypePos(List<Schema> pTypes){
     int iCnt = 0;
     for (Schema sch :  pTypes)

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
 Thu Feb 13 20:19:22 2014
@@ -19,8 +19,6 @@
 package org.apache.gora.cassandra.query;
 
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetEncoder;
 import java.util.List;
 import java.util.Map;
 
@@ -38,11 +36,6 @@ import org.slf4j.LoggerFactory;
 public class CassandraSubColumn extends CassandraColumn {
   public static final Logger LOG = 
LoggerFactory.getLogger(CassandraSubColumn.class);
 
-  private static final String ENCODING = "UTF-8";
-  
-  private static CharsetEncoder charsetEncoder = 
Charset.forName(ENCODING).newEncoder();;
-
-
   /**
    * Key-value pair containing the raw data.
    */
@@ -52,36 +45,46 @@ public class CassandraSubColumn extends 
     return hColumn.getName();
   }
 
-  /**
-   * Deserialize a String into an typed Object, according to the field schema.
-   * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
-   */
-  public Object getValue() {
-    Field field = getField();
-    Schema fieldSchema = field.schema();
-    Type type = fieldSchema.getType();
-    ByteBuffer byteBuffer = hColumn.getValue();
-    if (byteBuffer == null) {
-      return null;
-    }
+  private Object getFieldValue(Type type, Schema fieldSchema, ByteBuffer 
byteBuffer){
     Object value = null;
-    if (type == Type.ARRAY) {
+    if (type.equals(Type.ARRAY)) {
       ListSerializer<?> serializer = 
ListSerializer.get(fieldSchema.getElementType());
       List<?> genericArray = serializer.fromByteBuffer(byteBuffer);
       value = genericArray;
-    } else if (type == Type.MAP) {
+    } else if (type.equals(Type.MAP)) {
       MapSerializer<?> serializer = 
MapSerializer.get(fieldSchema.getValueType());
       Map<?, ?> map = serializer.fromByteBuffer(byteBuffer);
       value = map;
-    } else if (type == Type.UNION){
+    } else if (type.equals(Type.RECORD)){
+      value = fromByteBuffer(fieldSchema, byteBuffer);
+      //TODO: Avro dan geri getirmek lazim.
+    } else if (type.equals(Type.UNION)){
       // the selected union schema is obtained
-      Schema unionFieldSchema = getUnionSchema(super.getUnionType(), 
field.schema());
+      Schema unionFieldSchema = getUnionSchema(super.getUnionType(), 
fieldSchema);
+      Type unionFieldType = unionFieldSchema.getType();
       // we use the selected union schema to deserialize our actual value
-      value = fromByteBuffer(unionFieldSchema, byteBuffer);
+      //value = fromByteBuffer(unionFieldSchema, byteBuffer);
+      value = getFieldValue(unionFieldType, unionFieldSchema, byteBuffer);
     } else {
       value = fromByteBuffer(fieldSchema, byteBuffer);
     }
+    return value;
+  }
+
+  /**
+   * Deserialize a String into an typed Object, according to the field schema.
+   * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
+   */
+  public Object getValue() {
+    Field field = getField();
+    Schema fieldSchema = field.schema();
+    Type type = fieldSchema.getType();
+    ByteBuffer byteBuffer = hColumn.getValue();
+    if (byteBuffer == null) {
+      return null;
+    }
 
+    Object value = getFieldValue(type, fieldSchema, byteBuffer);
     return value;
   }
   

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
 Thu Feb 13 20:19:22 2014
@@ -32,6 +32,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.gora.cassandra.serializers.CharSequenceSerializer;
+import org.apache.gora.cassandra.store.CassandraStore;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,11 +46,7 @@ public class CassandraSuperColumn extend
     return StringSerializer.get().toByteBuffer(hSuperColumn.getName());
   }
 
-  public Object getValue() {
-    Field field = getField();
-    Schema fieldSchema = field.schema();
-    Type type = fieldSchema.getType();
-    
+ private Object getSuperValue(Field field, Schema fieldSchema, Type type){
     Object value = null;
     
     switch (type) {
@@ -102,21 +99,76 @@ public class CassandraSuperColumn extend
 
           for (HColumn<ByteBuffer, ByteBuffer> hColumn : 
this.hSuperColumn.getColumns()) {
             String memberName = 
StringSerializer.get().fromByteBuffer(hColumn.getName());
+            if (memberName.indexOf(CassandraStore.UNION_COL_SUFIX) < 0) {
+              
             if (memberName == null || memberName.length() == 0) {
               LOG.warn("member name is null or empty.");
               continue;
             }
             Field memberField = fieldSchema.getField(memberName);
+            Schema memberSchema = memberField.schema();
+            Type memberType = memberSchema.getType();
+            
             CassandraSubColumn cassandraColumn = new CassandraSubColumn();
             cassandraColumn.setField(memberField);
             cassandraColumn.setValue(hColumn);
+            
+            if (memberType.equals(Type.UNION)){
+              HColumn<ByteBuffer, ByteBuffer> hc = 
getUnionTypeColumn(memberField.name()
+                  + CassandraStore.UNION_COL_SUFIX, 
this.hSuperColumn.getColumns().toArray());
+              Field unionField = new Field(memberField.name()
+                  + CassandraStore.UNION_COL_SUFIX, Schema.create(Type.INT),
+                  null, null);
+              
+              CassandraSubColumn unionColumn = new CassandraSubColumn();
+              
+              // get value of UNION stored type
+              unionColumn.setField(unionField);
+              unionColumn.setValue(hc);
+              Object val = unionColumn.getValue();
+              cassandraColumn.setUnionType(Integer.parseInt(val.toString()));
+            }
+            
             record.put(record.getSchema().getField(memberName).pos(), 
cassandraColumn.getValue());
           }
+          }
         }
         break;
+      case UNION:
+        int schemaPos = this.getUnionType();
+        Schema unioSchema = fieldSchema.getTypes().get(schemaPos);
+        Type unionType = unioSchema.getType();
+        value = getSuperValue(field, unioSchema, unionType);
+        break;
       default:
+        Object memberValue = null;
+        // Using for UnionIndex of Union type field get value. UnionIndex 
always Integer.  
+        for (HColumn<ByteBuffer, ByteBuffer> hColumn : 
this.hSuperColumn.getColumns()) {
+          memberValue = fromByteBuffer(fieldSchema, hColumn.getValue());      
+        }
+        value = memberValue;
         LOG.warn("Type: " + type.name() + " not supported for field: " + 
field.name());
     }
+    return value;
+  }
+
+  private HColumn<ByteBuffer, ByteBuffer> getUnionTypeColumn(String fieldName, 
Object[] hColumns) {
+    for (int iCnt = 0; iCnt < hColumns.length; iCnt++){
+      @SuppressWarnings("unchecked")
+      HColumn<ByteBuffer, ByteBuffer> hColumn = (HColumn<ByteBuffer, 
ByteBuffer>)hColumns[iCnt];
+      String columnName = 
StringSerializer.get().fromByteBuffer(hColumn.getNameBytes().duplicate());
+      if (fieldName.equals(columnName))
+        return hColumn;
+    }
+    return null;
+}
+
+  public Object getValue() {
+    Field field = getField();
+    Schema fieldSchema = field.schema();
+    Type type = fieldSchema.getType();
+    
+    Object value = getSuperValue(field, fieldSchema, type);
     
     return value;
   }

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
 Thu Feb 13 20:19:22 2014
@@ -22,6 +22,8 @@ import static me.prettyprint.hector.api.
 
 import java.nio.ByteBuffer;
 
+import org.apache.avro.util.Utf8;
+
 import me.prettyprint.cassandra.serializers.AbstractSerializer;
 import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.ddl.ComparatorType;
@@ -46,11 +48,12 @@ public final class CharSequenceSerialize
   }
 
   @Override
-  public CharSequence fromByteBuffer(ByteBuffer byteBuffer) {
+  //TODO: CharSequence cause Test Fail. All tests set UTF8. When change test 
set type. This will be CharSequence 
+  public Utf8 fromByteBuffer(ByteBuffer byteBuffer) {
     if (byteBuffer == null) {
       return null;
     }
-    return StringSerializer.get().fromByteBuffer(byteBuffer);
+    return new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
   }
 
   @Override

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
 Thu Feb 13 20:19:22 2014
@@ -28,6 +28,7 @@ import me.prettyprint.cassandra.serializ
 import me.prettyprint.cassandra.serializers.FloatSerializer;
 import me.prettyprint.cassandra.serializers.IntegerSerializer;
 import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.ObjectSerializer;
 import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
 import me.prettyprint.cassandra.serializers.StringSerializer;
 import me.prettyprint.hector.api.Serializer;
@@ -37,6 +38,7 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.specific.SpecificFixed;
 import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.Persistent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,7 +91,10 @@ public class GoraSerializerTypeInferer {
         Schema schema = TypeUtils.getSchema(value0);
         serializer = MapSerializer.get(schema);
       }
-    } else {
+    } else if (value instanceof Persistent){
+      serializer = ObjectSerializer.get();
+    }
+    else {
       serializer = SerializerTypeInferer.getSerializer(value);
     }
     return serializer;
@@ -124,30 +129,32 @@ public class GoraSerializerTypeInferer {
   public static <T> Serializer<T> getSerializer(Schema schema) {
     Serializer serializer = null;
     Type type = schema.getType();
-    if (type == Type.STRING) {
+    if (type.equals(Type.STRING)) {
       serializer = CharSequenceSerializer.get();
-    } else if (type == Type.BOOLEAN) {
+    } else if (type.equals(Type.BOOLEAN)) {
       serializer = BooleanSerializer.get();
-    } else if (type == Type.BYTES) {
+    } else if (type.equals(Type.BYTES)) {
       serializer = ByteBufferSerializer.get();
-    } else if (type == Type.DOUBLE) {
+    } else if (type.equals(Type.DOUBLE)) {
       serializer = DoubleSerializer.get();
-    } else if (type == Type.FLOAT) {
+    } else if (type.equals(Type.FLOAT)) {
       serializer = FloatSerializer.get();
-    } else if (type == Type.INT) {
+    } else if (type.equals(Type.INT)) {
       serializer = IntegerSerializer.get();
-    } else if (type == Type.LONG) {
+    } else if (type.equals(Type.LONG)) {
       serializer = LongSerializer.get();
-    } else if (type == Type.FIXED) {
+    } else if (type.equals(Type.FIXED)) {
       Class clazz = TypeUtils.getClass(schema);
       serializer = SpecificFixedSerializer.get(clazz);
       // serializer = SpecificFixedSerializer.get(schema);
-    } else if (type == Type.ARRAY) {
+    } else if (type.equals(Type.ARRAY)) {
       serializer = ListSerializer.get(schema.getElementType());
-    } else if (type == Type.MAP) {
+    } else if (type.equals(Type.MAP)) {
        serializer = MapSerializer.get(schema.getValueType());
-    } else if (type == Type.UNION){
+    } else if (type.equals(Type.UNION)){
       serializer = ByteBufferSerializer.get();
+    } else if (type.equals(Type.RECORD)){
+      serializer = BytesArraySerializer.get();
     } else {
       serializer = null;
     }

Modified: 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
 Thu Feb 13 20:19:22 2014
@@ -18,6 +18,7 @@
 
 package org.apache.gora.cassandra.store;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -28,6 +29,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
 
 import me.prettyprint.hector.api.beans.ColumnSlice;
 import me.prettyprint.hector.api.beans.HColumn;
@@ -40,7 +42,13 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.query.CassandraResult;
 import org.apache.gora.cassandra.query.CassandraResultSet;
@@ -48,6 +56,7 @@ import org.apache.gora.cassandra.query.C
 import org.apache.gora.cassandra.query.CassandraSubColumn;
 import org.apache.gora.cassandra.query.CassandraSuperColumn;
 import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -60,7 +69,7 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link org.apache.gora.cassandra.store.CassandraStore} is the primary class 
  * responsible for directing Gora CRUD operations into Cassandra. We 
(delegate) rely 
- * heavily on {@ link org.apache.gora.cassandra.store.CassandraClient} for 
many operations
+ * heavily on {@link org.apache.gora.cassandra.store.CassandraClient} for many 
operations
  * such as initialization, creating and deleting schemas (Cassandra 
Keyspaces), etc.  
  */
 public class CassandraStore<K, T extends PersistentBase> extends 
DataStoreBase<K, T> {
@@ -71,13 +80,13 @@ public class CassandraStore<K, T extends
   private CassandraClient<K, T>  cassandraClient = new CassandraClient<K, T>();
 
   /**
-   * Fixed string used to generate an extra column based on 
+   * Fixed string with value "UnionIndex" used to generate an extra column 
based on 
    * the original field's name
    */
   public static String UNION_COL_SUFIX = "UnionIndex";
 
   /**
-   * Default schema index used when AVRO Union data types are stored
+   * Default schema index with value "0" used when AVRO Union data types are 
stored
    */
   public static int DEFAULT_UNION_SCHEMA = 0;
 
@@ -90,9 +99,29 @@ public class CassandraStore<K, T extends
    */
   private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K, 
T>());
 
+  /**
+   * Threadlocals maintaining reusable binary decoders and encoders.
+   */
+  private static ThreadLocal<ByteArrayOutputStream> outputStream =
+      new ThreadLocal<ByteArrayOutputStream>();
+  
+  public static final ThreadLocal<BinaryEncoder> encoders =
+      new ThreadLocal<BinaryEncoder>();
+  
+  /**
+   * Create a {@link java.util.concurrent.ConcurrentHashMap} for the 
+   * datum readers and writers. 
+   * This is necessary because they are not thread safe, at least not before 
+   * Avro 1.4.0 (See AVRO-650).
+   * When they are thread safe, it is possible to maintain a single reader and
+   * writer pair for every schema, instead of one for every thread.
+   * @see <a href="https://issues.apache.org/jira/browse/AVRO-650";>AVRO-650</a>
+   */
+  public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> 
writerMap = 
+      new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
+  
   /** The default constructor for CassandraStore */
   public CassandraStore() throws Exception {
-    // this.cassandraClient.initialize();
   }
 
   /** 
@@ -186,7 +215,7 @@ public class CassandraStore<K, T extends
    * partitioned across nodes based on row Key.
    */
   private void addSubColumns(String family, CassandraQuery<K, T> 
cassandraQuery,
-      CassandraResultSet cassandraResultSet) {
+      CassandraResultSet<K> cassandraResultSet) {
     // select family columns that are included in the query
     List<Row<K, ByteBuffer, ByteBuffer>> rows = 
this.cassandraClient.execute(cassandraQuery, family);
 
@@ -219,14 +248,14 @@ public class CassandraStore<K, T extends
    * partitioned across nodes based on row Key.
    */
   private void addSuperColumns(String family, CassandraQuery<K, T> 
cassandraQuery, 
-      CassandraResultSet cassandraResultSet) {
+      CassandraResultSet<K> cassandraResultSet) {
 
     List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows = 
this.cassandraClient.executeSuper(cassandraQuery, family);
     for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
       K key = superRow.getKey();
       CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
       if (cassandraRow == null) {
-        cassandraRow = new CassandraRow();
+        cassandraRow = new CassandraRow<K>();
         cassandraResultSet.putRow(key, cassandraRow);
         cassandraRow.setKey(key);
       }
@@ -270,7 +299,7 @@ public class CassandraStore<K, T extends
 
       for (Field field: schema.getFields()) {
         if (value.isDirty(field.pos())) {
-          addOrUpdateField(key, field, value.get(field.pos()));
+          addOrUpdateField(key, field, field.schema(), value.get(field.pos()));
         }
       }
     }
@@ -287,7 +316,22 @@ public class CassandraStore<K, T extends
     CassandraQuery<K,T> query = new CassandraQuery<K,T>();
     query.setDataStore(this);
     query.setKeyRange(key, key);
-    query.setFields(fields);
+    
+    
+    // Generating UnionFields
+    ArrayList<String> unionFields = new ArrayList<String>();
+    for (String field: fields){
+      Field schemaField =this.fieldMap.get(field);
+      Type type = schemaField.schema().getType();
+      if (type.getName().equals("UNION".toLowerCase())){
+        unionFields.add(field+UNION_COL_SUFIX);
+      }
+    }
+    
+    String[] arr = unionFields.toArray(new String[unionFields.size()]);
+    String[] both = (String[]) ArrayUtils.addAll(fields, arr);
+    
+    query.setFields(both);
     query.setLimit(1);
     Result<K,T> result = execute(query);
     boolean hasResult = false;
@@ -339,17 +383,21 @@ public class CassandraStore<K, T extends
    * <li>Create a new duplicate instance of the object (explained in more 
detail below) **.</li>
    * <li>Obtain a {@link java.util.List} of the {@link org.apache.avro.Schema} 
    * {@link org.apache.avro.Schema.Field}'s.</li>
-   * <li>Iterate through the {@link java.util.List}. This allows us to process
-   * each item appropriately.</li>
-   * <li>Check to see if the {@link org.apache.avro.Schema.Field} is either at 
-   * position 0 OR it is NOT dirty. If one of these conditions is true then we 
DO NOT
-   * process this field.</li>
-   * <li>Obtain the element at the specified position in this list.</li>
+   * <li>Iterate through the field {@link java.util.List}. This allows us to 
+   * consequently process each item.</li>
+   * <li>Check to see if the {@link org.apache.avro.Schema.Field} is NOT 
dirty. 
+   * If this condition is true then we DO NOT process this field.</li>
+   * <li>Obtain the element at the specified position in this list so we can 
+   * directly operate on it.</li>
    * <li>Obtain the {@link org.apache.avro.Schema.Type} of the element 
obtained 
-   * above and process it accordingly. N.B. For nested type RECORD we shadow 
-   * the checks to see if the {@link org.apache.avro.Schema.Field} is either 
at 
+   * above and process it accordingly. N.B. For nested type ARRAY, MAP
+   * RECORD or UNION, we shadow the checks in bullet point 5 above to infer 
that the 
+   * {@link org.apache.avro.Schema.Field} is either at 
    * position 0 OR it is NOT dirty. If one of these conditions is true then we 
DO NOT
-   * process this field.</li>
+   * process this field. This is carried out in 
+   * {@link 
org.apache.gora.cassandra.store.CassandraStore#getFieldValue(Schema, Type, 
Object)}</li>
+   * <li>We then insert the Key and Object into the {@link 
java.util.LinkedHashMap} buffer 
+   * before being flushed. This performs a structural modification of the 
map.</li>
    * </ol>
    * ** We create a duplicate instance of the object to be persisted and 
insert processed
    * objects into a synchronized {@link java.util.LinkedHashMap}. This allows 
@@ -372,37 +420,9 @@ public class CassandraStore<K, T extends
       Field field = fields.get(i);
       Type type = field.schema().getType();
       Object fieldValue = value.get(field.pos());
+      Schema fieldSchema = field.schema();
       // check if field has a nested structure (array, map, record or union)
-
-      switch(type) {
-      case RECORD:
-        Persistent persistent = (Persistent) fieldValue;
-        Persistent newRecord = (Persistent) 
SpecificData.get().newRecord(persistent, persistent.getSchema());
-        for (Field member: field.schema().getFields()) {
-          if (member.pos() == 0 || !persistent.isDirty()) {
-            continue;
-          }
-          newRecord.put(member.pos(), persistent.get(member.pos()));
-        }
-        fieldValue = newRecord;
-        break;
-      case MAP:
-        Map<?, ?> map = (Map<?, ?>) fieldValue;
-        fieldValue = map;
-        break;
-      case ARRAY:
-        fieldValue = (List<?>) fieldValue;
-        break;
-      case UNION:
-        // storing the union selected schema, the actual value will 
-        // be stored as soon as we get break out.
-        int schemaPos = getUnionSchema(fieldValue,field.schema());
-        p.put( schemaPos, p.getSchema().getField(field.name() + 
CassandraStore.UNION_COL_SUFIX));
-        //p.put(fieldPos, fieldValue);
-        break;
-      default:
-        break;
-      }
+      fieldValue = getFieldValue(fieldSchema, type, fieldValue);
       p.put(field.pos(), fieldValue);
     }
     // this performs a structural modification of the map
@@ -410,14 +430,64 @@ public class CassandraStore<K, T extends
   }
 
   /**
+   * For every field within an object, we pass in a field schema, Type and 
value.
+   * This enables us to process fields (based on their characteristics) 
+   * preparing them for persistence.
+   * @param fieldSchema the associated field schema
+   * @param type the field type
+   * @param fieldValue the field value.
+   * @return
+   */
+  private Object getFieldValue(Schema fieldSchema, Type type, Object 
fieldValue ){
+    switch(type) {
+    case RECORD:
+      Persistent persistent = (Persistent) fieldValue;
+      Persistent newRecord = (Persistent) 
SpecificData.get().newRecord(persistent, persistent.getSchema());
+      for (Field member: fieldSchema.getFields()) {
+        if (member.pos() == 0 || !persistent.isDirty()) {
+          continue;
+        }
+        Schema memberSchema = member.schema();
+        Type memberType = memberSchema.getType();
+        Object memberValue = persistent.get(member.pos());
+        newRecord.put(member.pos(), getFieldValue(memberSchema, memberType, 
memberValue));
+      }
+      fieldValue = newRecord;
+      break;
+    case MAP:
+      Map<?, ?> map = (Map<?, ?>) fieldValue;
+      fieldValue = map;
+      break;
+    case ARRAY:
+      fieldValue = (List<?>) fieldValue;
+      break;
+    case UNION:
+      // storing the union selected schema, the actual value will 
+      // be stored as soon as we get break out.
+      if (fieldValue != null){
+        int schemaPos = getUnionSchema(fieldValue,fieldSchema);
+        Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+        Type unionType = unionSchema.getType();
+        fieldValue = getFieldValue(unionSchema, unionType, fieldValue);
+      }
+      //p.put( schemaPos, p.getSchema().getField(field.name() + 
CassandraStore.UNION_COL_SUFIX));
+      //p.put(fieldPos, fieldValue);
+      break;
+    default:
+      break;
+    }    
+    return fieldValue;
+  }
+  
+  /**
    * Add a field to Cassandra according to its type.
    * @param key     the key of the row where the field should be added
    * @param field   the Avro field representing a datum
    * @param value   the field value
    */
-  @SuppressWarnings({ "unchecked", "null" })
-  private void addOrUpdateField(K key, Field field, Object value) {
-    Schema schema = field.schema();
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private void addOrUpdateField(K key, Field field, Schema schema, Object 
value) {
+    //Schema schema = field.schema();
     Type type = schema.getType();
     // checking if the value to be updated is used for saving union schema
     if (field.name().indexOf(CassandraStore.UNION_COL_SUFIX) < 0){
@@ -436,22 +506,74 @@ public class CassandraStore<K, T extends
         if (value != null) {
           if (value instanceof PersistentBase) {
             PersistentBase persistentBase = (PersistentBase) value;
-            for (Field member: schema.getFields()) {
-
-              // TODO: hack, do not store empty arrays
-              Object memberValue = persistentBase.get(member.pos());
-              if (memberValue instanceof List<?>) {
-                if (((List<?>)memberValue).size() == 0) {
-                  continue;
-                }
-              } else if (memberValue instanceof Map<?,?>) {
-                if (((Map<?, ?>)memberValue).size() == 0) {
-                  continue;
-                }
-              }
-              this.cassandraClient.addSubColumn(key, field.name(), 
-                  member.name(), memberValue);
+            
+            SpecificDatumWriter writer = (SpecificDatumWriter<?>) 
writerMap.get(schema.getFullName());
+            if (writer == null) {
+              writer = new SpecificDatumWriter(schema);// ignore dirty bits
+              writerMap.put(schema.getFullName(),writer);
+            }
+            
+            BinaryEncoder encoderFromCache = encoders.get();
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            outputStream.set(bos);
+            BinaryEncoder encoder = 
EncoderFactory.get().directBinaryEncoder(bos, null);
+            if (encoderFromCache == null) {
+              encoders.set(encoder);
             }
+            
+            //reset the buffers
+            ByteArrayOutputStream os = outputStream.get();
+            os.reset();
+            
+            try {
+              writer.write(persistentBase, encoder);
+              encoder.flush();
+            } catch (IOException e) {
+              // TODO Auto-generated catch block
+              e.printStackTrace();
+            }
+            byte[] byteValue = os.toByteArray();
+          
+            //String familyName = 
this.cassandraClient.getCassandraMapping().getFamily(field.name());
+//            if (this.cassandraClient.isSuper( familyName )){
+//              this.cassandraClient.addSubColumn(key, columnName, columnName, 
schemaPos);            
+//            }else{
+//              
+//              
+//            }            
+            this.cassandraClient.addColumn(key, field.name(), byteValue);
+            
+//            for (Field member: schema.getFields()) {
+//              if (member.pos() == 0) {
+//                continue;
+//              }
+//              // TODO: hack, do not store empty arrays
+//              Object memberValue = persistentBase.get(member.pos());
+//              if (memberValue instanceof List<?>) {
+//                if (((List<?>)memberValue).size() == 0) {
+//                  continue;
+//                }
+//              } else if (memberValue instanceof Map<?,?>) {
+//                if (((Map<?, ?>)memberValue).size() == 0) {
+//                  continue;
+//                }
+//              }
+//              if (memberValue == null){
+//                continue;
+//              }
+//              
+//              // Get type for Union Fields
+//              Schema memberSchema = member.schema();
+//              Type fieldType = memberSchema.getType();
+//              if (fieldType.equals(Type.UNION)){
+//                int schemaPos = getUnionSchema(memberValue, memberSchema);
+//                this.cassandraClient.addSubColumn(key, field.name(), 
+//                    member.name()+UNION_COL_SUFIX, schemaPos);
+//              }
+//              
+//              this.cassandraClient.addSubColumn(key, field.name(), 
+//                  member.name(), memberValue);
+//            }
           } else {
             LOG.warn("Record with value: " + value.toString() + " not 
supported for field: " + field.name());
           }
@@ -468,8 +590,13 @@ public class CassandraStore<K, T extends
         break;
       case ARRAY:
         if (value != null) {
-          if (value instanceof GenericArray<?>) {
-            this.cassandraClient.addGenericArray(key, field.name(), 
(GenericArray<?>)value);
+          if (value instanceof DirtyListWrapper<?>) {
+            DirtyListWrapper fieldValue = (DirtyListWrapper<?>)value;
+            GenericArray valueArray = new Array(fieldValue.size(), schema);
+            for (int i = 0; i < fieldValue.size(); i++) {
+              valueArray.add(i, fieldValue.get(i));
+            }
+            this.cassandraClient.addGenericArray(key, field.name(), 
(GenericArray<?>)valueArray);
           } else {
             LOG.warn("Array with value: " + value.toString() + " not supported 
for field: " + field.name());
           }
@@ -477,14 +604,23 @@ public class CassandraStore<K, T extends
         break;
       case UNION:
         if(value != null) {
-          LOG.debug("Union with value: " + value.toString() + " at index: " + 
getUnionSchema(value, schema) + " supported for field: " + field.name());
+          int schemaPos = getUnionSchema(value, schema);
+          LOG.debug("Union with value: " + value.toString() + " at index: " + 
schemaPos + " supported for field: " + field.name());
           // adding union schema index
           String columnName = field.name() + UNION_COL_SUFIX;
           String familyName = 
this.cassandraClient.getCassandraMapping().getFamily(field.name());
           this.cassandraClient.getCassandraMapping().addColumn(familyName, 
columnName, columnName);
-          this.cassandraClient.addColumn(key, columnName, 
getUnionSchema(value, schema));
+          if (this.cassandraClient.isSuper( familyName )){
+            this.cassandraClient.addSubColumn(key, columnName, columnName, 
schemaPos);            
+          }else{
+            this.cassandraClient.addColumn(key, columnName, schemaPos);
+            
+          }
+//          this.cassandraClient.getCassandraMapping().addColumn(familyName, 
columnName, columnName);
           // adding union value
-          this.cassandraClient.addColumn(key, field.name(), value);
+          Schema unioSchema = schema.getTypes().get(schemaPos);
+          addOrUpdateField(key, field, unioSchema, value);
+          //this.cassandraClient.addColumn(key, field.name(), value);
         } else {
           LOG.warn("Union with 'null' value not supported for field: " + 
field.name());
         }
@@ -506,24 +642,30 @@ public class CassandraStore<K, T extends
    */
   private int getUnionSchema(Object pValue, Schema pUnionSchema){
     int unionSchemaPos = 0;
-    String valueType = pValue.getClass().getSimpleName();
+//    String valueType = pValue.getClass().getSimpleName();
     Iterator<Schema> it = pUnionSchema.getTypes().iterator();
     while ( it.hasNext() ){
-      String schemaName = it.next().getName();
-      if (valueType.equals("Utf8") && 
schemaName.equals(Type.STRING.name().toLowerCase()))
+      Type schemaType = it.next().getType();
+      if (pValue instanceof Utf8 && schemaType.equals(Type.STRING))
         return unionSchemaPos;
-      else if (valueType.equals("HeapByteBuffer") && 
schemaName.equals(Type.STRING.name().toLowerCase()))
+      else if (pValue instanceof ByteBuffer && schemaType.equals(Type.BYTES))
         return unionSchemaPos;
-      else if (valueType.equals("Integer") && 
schemaName.equals(Type.INT.name().toLowerCase()))
+      else if (pValue instanceof Integer && schemaType.equals(Type.INT))
         return unionSchemaPos;
-      else if (valueType.equals("Long") && 
schemaName.equals(Type.LONG.name().toLowerCase()))
+      else if (pValue instanceof Long && schemaType.equals(Type.LONG))
         return unionSchemaPos;
-      else if (valueType.equals("Double") && 
schemaName.equals(Type.DOUBLE.name().toLowerCase()))
+      else if (pValue instanceof Double && schemaType.equals(Type.DOUBLE))
         return unionSchemaPos;
-      else if (valueType.equals("Float") && 
schemaName.equals(Type.FLOAT.name().toLowerCase()))
+      else if (pValue instanceof Float && schemaType.equals(Type.FLOAT))
         return unionSchemaPos;
-      else if (valueType.equals("Boolean") && 
schemaName.equals(Type.BOOLEAN.name().toLowerCase()))
+      else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN))
         return unionSchemaPos;
+      else if (pValue instanceof Map && schemaType.equals(Type.MAP))
+        return unionSchemaPos;  
+      else if (pValue instanceof List && schemaType.equals(Type.ARRAY))
+        return unionSchemaPos;        
+      else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD))
+        return unionSchemaPos;          
       unionSchemaPos ++;
     }
     // if we weren't able to determine which data type it is, then we return 
the default

Modified: 
gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml 
(original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml 
Thu Feb 13 20:19:22 2014
@@ -47,7 +47,7 @@
     <field name="content" family="p" qualifier="p:cnt:c"/>
     <field name="parsedContent" family="sc" qualifier="p:parsedContent"/>
     <field name="outlinks" family="sc" qualifier="p:outlinks"/>
-    <field name="metadata" family="sc" qualifier="c:mt"/>
+    <field name="metadata" family="p" qualifier="c:mt"/>
   </class>
 
   <class name="org.apache.gora.examples.generated.TokenDatum" 
keyClass="java.lang.String" keyspace="TokenDatum">

Modified: 
gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
URL: 
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
--- 
gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
 (original)
+++ 
gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
 Thu Feb 13 20:19:22 2014
@@ -68,8 +68,6 @@ public class TestCassandraStore extends 
   }
 
 
-  //We need to skip the following tests for a while until we fix some issues..
-
   @Ignore("skipped until some bugs are fixed")
   @Override
   public void testGetWebPageDefaultFields() throws IOException {}
@@ -88,16 +86,16 @@ public class TestCassandraStore extends 
   @Ignore("skipped until some bugs are fixed")
   @Override
   public void testQueryWebPageSingleKeyDefaultFields() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented 
at CassandraStore, and always returns false or 0")
   @Override
   public void testDelete() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented 
at CassandraStore, and always returns false or 0")
   @Override
   public void testDeleteByQuery() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented 
at CassandraStore, and always returns false or 0")
   @Override
   public void testDeleteByQueryFields() throws IOException {}
-  @Ignore("skipped until some bugs are fixed")
+  @Ignore("GORA-298 Implement CassandraStore#getPartitions")
   @Override
   public void testGetPartitions() throws IOException {}
   @Ignore("skipped until some bugs are fixed")


Reply via email to