Author: lewismc
Date: Thu Feb 13 20:19:22 2014
New Revision: 1568028
URL: http://svn.apache.org/r1568028
Log:
GORA-245v5
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java
Thu Feb 13 20:19:22 2014
@@ -18,12 +18,18 @@
package org.apache.gora.cassandra.query;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
import me.prettyprint.hector.api.Serializer;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +47,20 @@ public abstract class CassandraColumn {
private int type;
private Field field;
private int unionType;
-
+
+ public static final ThreadLocal<BinaryDecoder> decoders =
+ new ThreadLocal<BinaryDecoder>();
+
+ /*
+ * Create a threadlocal map for the datum readers and writers, because
+ * they are not thread safe, at least not before Avro 1.4.0 (See AVRO-650).
+ * When they are thread safe, it is possible to maintain a single reader and
+ * writer pair for every schema, instead of one for every thread.
+ */
+
+ public static final ConcurrentHashMap<String, SpecificDatumReader<?>>
readerMap =
+ new ConcurrentHashMap<String, SpecificDatumReader<?>>();
+
public void setUnionType(int pUnionType){
this.unionType = pUnionType;
}
@@ -72,7 +91,8 @@ public abstract class CassandraColumn {
public abstract ByteBuffer getName();
public abstract Object getValue();
-
+
+ @SuppressWarnings({ "rawtypes" })
protected Object fromByteBuffer(Schema schema, ByteBuffer byteBuffer) {
Object value = null;
Serializer<?> serializer = GoraSerializerTypeInferer.getSerializer(schema);
@@ -81,6 +101,32 @@ public abstract class CassandraColumn {
+ "could be found. Please report this to [email protected]");
} else {
value = serializer.fromByteBuffer(byteBuffer);
+ if (schema.getType().equals(Type.RECORD)){
+ String schemaId = schema.getFullName();
+
+ SpecificDatumReader<?> reader =
(SpecificDatumReader<?>)readerMap.get(schemaId);
+ if (reader == null) {
+ reader = new SpecificDatumReader(schema);// ignore dirty bits
+ SpecificDatumReader localReader=null;
+ if((localReader=readerMap.putIfAbsent(schemaId, reader))!=null) {
+ reader = localReader;
+ }
+ }
+
+ // initialize a decoder, possibly reusing previous one
+ BinaryDecoder decoderFromCache = decoders.get();
+ BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder((byte[])value, null);
+ // put in threadlocal cache if the initial get was empty
+ if (decoderFromCache==null) {
+ decoders.set(decoder);
+ }
+ try {
+ value = reader.read(null, decoder);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
}
return value;
}
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
Thu Feb 13 20:19:22 2014
@@ -72,7 +72,7 @@ public class CassandraResult<K, T extend
for (int iCnt = 0; iCnt < pCassandraRow.length; iCnt++){
CassandraColumn cColumn = (CassandraColumn)pCassandraRow[iCnt];
- String columnName =
StringSerializer.get().fromByteBuffer(cColumn.getName());
+ String columnName =
StringSerializer.get().fromByteBuffer(cColumn.getName().duplicate());
if (pFieldName.equals(columnName))
return cColumn;
}
@@ -95,46 +95,50 @@ public class CassandraResult<K, T extend
List<Field> fields = schema.getFields();
for (CassandraColumn cassandraColumn: cassandraRow) {
-
// get field name
- String family = cassandraColumn.getFamily();
- String fieldName = this.reverseMap.get(family + ":" +
- StringSerializer.get().fromByteBuffer(cassandraColumn.getName()));
+ String family = cassandraColumn.getFamily();
+
+ String fieldName = this.reverseMap.get(family + ":" +
StringSerializer.get().fromByteBuffer(cassandraColumn.getName().duplicate()));
- if (fieldName != null ){
+ if (fieldName != null) {
// get field
- int pos = this.persistent.getSchema().getField(fieldName).pos();
- Field field = fields.get(pos);
- Type fieldType = field.schema().getType();
-
//LOG.info(StringSerializer.get().fromByteBuffer(cassandraColumn.getName()) +
fieldName + " " + fieldType.name());
- if (fieldType == Type.UNION){
- // getting UNION stored type
- int posUnionType = this.persistent.getSchema().getField(fieldName +
CassandraStore.UNION_COL_SUFIX).pos();
- Field fieldUnionType = fields.get(posUnionType);
- CassandraColumn cc = getUnionTypeColumn(fieldName +
CassandraStore.UNION_COL_SUFIX, cassandraRow.toArray());
- // get value of UNION stored type
- cc.setField(fieldUnionType);
- Object val = cc.getValue();
- this.persistent.put(posUnionType, val);
+ if (fieldName.indexOf(CassandraStore.UNION_COL_SUFIX) < 0) {
+
+ int pos = this.persistent.getSchema().getField(fieldName).pos();
+ Field field = fields.get(pos);
+ Type fieldType = field.schema().getType();
+ //
LOG.info(StringSerializer.get().fromByteBuffer(cassandraColumn.getName())
+ // + fieldName + " " + fieldType.name());
+ if (fieldType.equals(Type.UNION)) {
+ //getting UNION stored type
+ CassandraColumn cc = getUnionTypeColumn(fieldName
+ + CassandraStore.UNION_COL_SUFIX, cassandraRow.toArray());
+ //creating temporary UNION Field
+ Field unionField = new Field(fieldName
+ + CassandraStore.UNION_COL_SUFIX, Schema.create(Type.INT),
+ null, null);
+ // get value of UNION stored type
+ cc.setField(unionField);
+ Object val = cc.getValue();
+ cassandraColumn.setUnionType(Integer.parseInt(val.toString()));
+ }
+
+ // get value
+ cassandraColumn.setField(field);
+ Object value = cassandraColumn.getValue();
+
+ this.persistent.put(pos, value);
// this field does not need to be written back to the store
- this.persistent.clearDirty(posUnionType);
- cassandraColumn.setUnionType(Integer.parseInt(val.toString()));
+ this.persistent.clearDirty(pos);
}
-
- // get value
- cassandraColumn.setField(field);
- Object value = cassandraColumn.getValue();
-
- this.persistent.put(pos, value);
- // this field does not need to be written back to the store
- this.persistent.clearDirty(pos);
- }
- else
+ } else
LOG.debug("FieldName was null while iterating CassandraRow and using
Avro Union type");
}
}
+ //TODO Should we remove this method?
+ @SuppressWarnings("unused")
private int getNonNullTypePos(List<Schema> pTypes){
int iCnt = 0;
for (Schema sch : pTypes)
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
Thu Feb 13 20:19:22 2014
@@ -19,8 +19,6 @@
package org.apache.gora.cassandra.query;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetEncoder;
import java.util.List;
import java.util.Map;
@@ -38,11 +36,6 @@ import org.slf4j.LoggerFactory;
public class CassandraSubColumn extends CassandraColumn {
public static final Logger LOG =
LoggerFactory.getLogger(CassandraSubColumn.class);
- private static final String ENCODING = "UTF-8";
-
- private static CharsetEncoder charsetEncoder =
Charset.forName(ENCODING).newEncoder();;
-
-
/**
* Key-value pair containing the raw data.
*/
@@ -52,36 +45,46 @@ public class CassandraSubColumn extends
return hColumn.getName();
}
- /**
- * Deserialize a String into an typed Object, according to the field schema.
- * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
- */
- public Object getValue() {
- Field field = getField();
- Schema fieldSchema = field.schema();
- Type type = fieldSchema.getType();
- ByteBuffer byteBuffer = hColumn.getValue();
- if (byteBuffer == null) {
- return null;
- }
+ private Object getFieldValue(Type type, Schema fieldSchema, ByteBuffer
byteBuffer){
Object value = null;
- if (type == Type.ARRAY) {
+ if (type.equals(Type.ARRAY)) {
ListSerializer<?> serializer =
ListSerializer.get(fieldSchema.getElementType());
List<?> genericArray = serializer.fromByteBuffer(byteBuffer);
value = genericArray;
- } else if (type == Type.MAP) {
+ } else if (type.equals(Type.MAP)) {
MapSerializer<?> serializer =
MapSerializer.get(fieldSchema.getValueType());
Map<?, ?> map = serializer.fromByteBuffer(byteBuffer);
value = map;
- } else if (type == Type.UNION){
+ } else if (type.equals(Type.RECORD)){
+ value = fromByteBuffer(fieldSchema, byteBuffer);
+ //TODO: Avro dan geri getirmek lazim.
+ } else if (type.equals(Type.UNION)){
// the selected union schema is obtained
- Schema unionFieldSchema = getUnionSchema(super.getUnionType(),
field.schema());
+ Schema unionFieldSchema = getUnionSchema(super.getUnionType(),
fieldSchema);
+ Type unionFieldType = unionFieldSchema.getType();
// we use the selected union schema to deserialize our actual value
- value = fromByteBuffer(unionFieldSchema, byteBuffer);
+ //value = fromByteBuffer(unionFieldSchema, byteBuffer);
+ value = getFieldValue(unionFieldType, unionFieldSchema, byteBuffer);
} else {
value = fromByteBuffer(fieldSchema, byteBuffer);
}
+ return value;
+ }
+
+ /**
+ * Deserialize a String into an typed Object, according to the field schema.
+ * @see org.apache.gora.cassandra.query.CassandraColumn#getValue()
+ */
+ public Object getValue() {
+ Field field = getField();
+ Schema fieldSchema = field.schema();
+ Type type = fieldSchema.getType();
+ ByteBuffer byteBuffer = hColumn.getValue();
+ if (byteBuffer == null) {
+ return null;
+ }
+ Object value = getFieldValue(type, fieldSchema, byteBuffer);
return value;
}
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
Thu Feb 13 20:19:22 2014
@@ -32,6 +32,7 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.gora.cassandra.serializers.CharSequenceSerializer;
+import org.apache.gora.cassandra.store.CassandraStore;
import org.apache.gora.persistency.impl.PersistentBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +46,7 @@ public class CassandraSuperColumn extend
return StringSerializer.get().toByteBuffer(hSuperColumn.getName());
}
- public Object getValue() {
- Field field = getField();
- Schema fieldSchema = field.schema();
- Type type = fieldSchema.getType();
-
+ private Object getSuperValue(Field field, Schema fieldSchema, Type type){
Object value = null;
switch (type) {
@@ -102,21 +99,76 @@ public class CassandraSuperColumn extend
for (HColumn<ByteBuffer, ByteBuffer> hColumn :
this.hSuperColumn.getColumns()) {
String memberName =
StringSerializer.get().fromByteBuffer(hColumn.getName());
+ if (memberName.indexOf(CassandraStore.UNION_COL_SUFIX) < 0) {
+
if (memberName == null || memberName.length() == 0) {
LOG.warn("member name is null or empty.");
continue;
}
Field memberField = fieldSchema.getField(memberName);
+ Schema memberSchema = memberField.schema();
+ Type memberType = memberSchema.getType();
+
CassandraSubColumn cassandraColumn = new CassandraSubColumn();
cassandraColumn.setField(memberField);
cassandraColumn.setValue(hColumn);
+
+ if (memberType.equals(Type.UNION)){
+ HColumn<ByteBuffer, ByteBuffer> hc =
getUnionTypeColumn(memberField.name()
+ + CassandraStore.UNION_COL_SUFIX,
this.hSuperColumn.getColumns().toArray());
+ Field unionField = new Field(memberField.name()
+ + CassandraStore.UNION_COL_SUFIX, Schema.create(Type.INT),
+ null, null);
+
+ CassandraSubColumn unionColumn = new CassandraSubColumn();
+
+ // get value of UNION stored type
+ unionColumn.setField(unionField);
+ unionColumn.setValue(hc);
+ Object val = unionColumn.getValue();
+ cassandraColumn.setUnionType(Integer.parseInt(val.toString()));
+ }
+
record.put(record.getSchema().getField(memberName).pos(),
cassandraColumn.getValue());
}
+ }
}
break;
+ case UNION:
+ int schemaPos = this.getUnionType();
+ Schema unioSchema = fieldSchema.getTypes().get(schemaPos);
+ Type unionType = unioSchema.getType();
+ value = getSuperValue(field, unioSchema, unionType);
+ break;
default:
+ Object memberValue = null;
+ // Using for UnionIndex of Union type field get value. UnionIndex
always Integer.
+ for (HColumn<ByteBuffer, ByteBuffer> hColumn :
this.hSuperColumn.getColumns()) {
+ memberValue = fromByteBuffer(fieldSchema, hColumn.getValue());
+ }
+ value = memberValue;
LOG.warn("Type: " + type.name() + " not supported for field: " +
field.name());
}
+ return value;
+ }
+
+ private HColumn<ByteBuffer, ByteBuffer> getUnionTypeColumn(String fieldName,
Object[] hColumns) {
+ for (int iCnt = 0; iCnt < hColumns.length; iCnt++){
+ @SuppressWarnings("unchecked")
+ HColumn<ByteBuffer, ByteBuffer> hColumn = (HColumn<ByteBuffer,
ByteBuffer>)hColumns[iCnt];
+ String columnName =
StringSerializer.get().fromByteBuffer(hColumn.getNameBytes().duplicate());
+ if (fieldName.equals(columnName))
+ return hColumn;
+ }
+ return null;
+}
+
+ public Object getValue() {
+ Field field = getField();
+ Schema fieldSchema = field.schema();
+ Type type = fieldSchema.getType();
+
+ Object value = getSuperValue(field, fieldSchema, type);
return value;
}
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java
Thu Feb 13 20:19:22 2014
@@ -22,6 +22,8 @@ import static me.prettyprint.hector.api.
import java.nio.ByteBuffer;
+import org.apache.avro.util.Utf8;
+
import me.prettyprint.cassandra.serializers.AbstractSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.ddl.ComparatorType;
@@ -46,11 +48,12 @@ public final class CharSequenceSerialize
}
@Override
- public CharSequence fromByteBuffer(ByteBuffer byteBuffer) {
+ //TODO: CharSequence cause Test Fail. All tests set UTF8. When change test
set type. This will be CharSequence
+ public Utf8 fromByteBuffer(ByteBuffer byteBuffer) {
if (byteBuffer == null) {
return null;
}
- return StringSerializer.get().fromByteBuffer(byteBuffer);
+ return new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer));
}
@Override
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java
Thu Feb 13 20:19:22 2014
@@ -28,6 +28,7 @@ import me.prettyprint.cassandra.serializ
import me.prettyprint.cassandra.serializers.FloatSerializer;
import me.prettyprint.cassandra.serializers.IntegerSerializer;
import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.ObjectSerializer;
import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Serializer;
@@ -37,6 +38,7 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.specific.SpecificFixed;
import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.Persistent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,7 +91,10 @@ public class GoraSerializerTypeInferer {
Schema schema = TypeUtils.getSchema(value0);
serializer = MapSerializer.get(schema);
}
- } else {
+ } else if (value instanceof Persistent){
+ serializer = ObjectSerializer.get();
+ }
+ else {
serializer = SerializerTypeInferer.getSerializer(value);
}
return serializer;
@@ -124,30 +129,32 @@ public class GoraSerializerTypeInferer {
public static <T> Serializer<T> getSerializer(Schema schema) {
Serializer serializer = null;
Type type = schema.getType();
- if (type == Type.STRING) {
+ if (type.equals(Type.STRING)) {
serializer = CharSequenceSerializer.get();
- } else if (type == Type.BOOLEAN) {
+ } else if (type.equals(Type.BOOLEAN)) {
serializer = BooleanSerializer.get();
- } else if (type == Type.BYTES) {
+ } else if (type.equals(Type.BYTES)) {
serializer = ByteBufferSerializer.get();
- } else if (type == Type.DOUBLE) {
+ } else if (type.equals(Type.DOUBLE)) {
serializer = DoubleSerializer.get();
- } else if (type == Type.FLOAT) {
+ } else if (type.equals(Type.FLOAT)) {
serializer = FloatSerializer.get();
- } else if (type == Type.INT) {
+ } else if (type.equals(Type.INT)) {
serializer = IntegerSerializer.get();
- } else if (type == Type.LONG) {
+ } else if (type.equals(Type.LONG)) {
serializer = LongSerializer.get();
- } else if (type == Type.FIXED) {
+ } else if (type.equals(Type.FIXED)) {
Class clazz = TypeUtils.getClass(schema);
serializer = SpecificFixedSerializer.get(clazz);
// serializer = SpecificFixedSerializer.get(schema);
- } else if (type == Type.ARRAY) {
+ } else if (type.equals(Type.ARRAY)) {
serializer = ListSerializer.get(schema.getElementType());
- } else if (type == Type.MAP) {
+ } else if (type.equals(Type.MAP)) {
serializer = MapSerializer.get(schema.getValueType());
- } else if (type == Type.UNION){
+ } else if (type.equals(Type.UNION)){
serializer = ByteBufferSerializer.get();
+ } else if (type.equals(Type.RECORD)){
+ serializer = BytesArraySerializer.get();
} else {
serializer = null;
}
Modified:
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
Thu Feb 13 20:19:22 2014
@@ -18,6 +18,7 @@
package org.apache.gora.cassandra.store;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -28,6 +29,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
@@ -40,7 +42,13 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.gora.cassandra.query.CassandraQuery;
import org.apache.gora.cassandra.query.CassandraResult;
import org.apache.gora.cassandra.query.CassandraResultSet;
@@ -48,6 +56,7 @@ import org.apache.gora.cassandra.query.C
import org.apache.gora.cassandra.query.CassandraSubColumn;
import org.apache.gora.cassandra.query.CassandraSuperColumn;
import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
@@ -60,7 +69,7 @@ import org.slf4j.LoggerFactory;
/**
* {@link org.apache.gora.cassandra.store.CassandraStore} is the primary class
* responsible for directing Gora CRUD operations into Cassandra. We
(delegate) rely
- * heavily on {@ link org.apache.gora.cassandra.store.CassandraClient} for
many operations
+ * heavily on {@link org.apache.gora.cassandra.store.CassandraClient} for many
operations
* such as initialization, creating and deleting schemas (Cassandra
Keyspaces), etc.
*/
public class CassandraStore<K, T extends PersistentBase> extends
DataStoreBase<K, T> {
@@ -71,13 +80,13 @@ public class CassandraStore<K, T extends
private CassandraClient<K, T> cassandraClient = new CassandraClient<K, T>();
/**
- * Fixed string used to generate an extra column based on
+ * Fixed string with value "UnionIndex" used to generate an extra column
based on
* the original field's name
*/
public static String UNION_COL_SUFIX = "UnionIndex";
/**
- * Default schema index used when AVRO Union data types are stored
+ * Default schema index with value "0" used when AVRO Union data types are
stored
*/
public static int DEFAULT_UNION_SCHEMA = 0;
@@ -90,9 +99,29 @@ public class CassandraStore<K, T extends
*/
private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K,
T>());
+ /**
+ * Threadlocals maintaining reusable binary decoders and encoders.
+ */
+ private static ThreadLocal<ByteArrayOutputStream> outputStream =
+ new ThreadLocal<ByteArrayOutputStream>();
+
+ public static final ThreadLocal<BinaryEncoder> encoders =
+ new ThreadLocal<BinaryEncoder>();
+
+ /**
+ * Create a {@link java.util.concurrent.ConcurrentHashMap} for the
+ * datum readers and writers.
+ * This is necessary because they are not thread safe, at least not before
+ * Avro 1.4.0 (See AVRO-650).
+ * When they are thread safe, it is possible to maintain a single reader and
+ * writer pair for every schema, instead of one for every thread.
+ * @see <a href="https://issues.apache.org/jira/browse/AVRO-650">AVRO-650</a>
+ */
+ public static final ConcurrentHashMap<String, SpecificDatumWriter<?>>
writerMap =
+ new ConcurrentHashMap<String, SpecificDatumWriter<?>>();
+
/** The default constructor for CassandraStore */
public CassandraStore() throws Exception {
- // this.cassandraClient.initialize();
}
/**
@@ -186,7 +215,7 @@ public class CassandraStore<K, T extends
* partitioned across nodes based on row Key.
*/
private void addSubColumns(String family, CassandraQuery<K, T>
cassandraQuery,
- CassandraResultSet cassandraResultSet) {
+ CassandraResultSet<K> cassandraResultSet) {
// select family columns that are included in the query
List<Row<K, ByteBuffer, ByteBuffer>> rows =
this.cassandraClient.execute(cassandraQuery, family);
@@ -219,14 +248,14 @@ public class CassandraStore<K, T extends
* partitioned across nodes based on row Key.
*/
private void addSuperColumns(String family, CassandraQuery<K, T>
cassandraQuery,
- CassandraResultSet cassandraResultSet) {
+ CassandraResultSet<K> cassandraResultSet) {
List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows =
this.cassandraClient.executeSuper(cassandraQuery, family);
for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
K key = superRow.getKey();
CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
if (cassandraRow == null) {
- cassandraRow = new CassandraRow();
+ cassandraRow = new CassandraRow<K>();
cassandraResultSet.putRow(key, cassandraRow);
cassandraRow.setKey(key);
}
@@ -270,7 +299,7 @@ public class CassandraStore<K, T extends
for (Field field: schema.getFields()) {
if (value.isDirty(field.pos())) {
- addOrUpdateField(key, field, value.get(field.pos()));
+ addOrUpdateField(key, field, field.schema(), value.get(field.pos()));
}
}
}
@@ -287,7 +316,22 @@ public class CassandraStore<K, T extends
CassandraQuery<K,T> query = new CassandraQuery<K,T>();
query.setDataStore(this);
query.setKeyRange(key, key);
- query.setFields(fields);
+
+
+ // Generating UnionFields
+ ArrayList<String> unionFields = new ArrayList<String>();
+ for (String field: fields){
+ Field schemaField =this.fieldMap.get(field);
+ Type type = schemaField.schema().getType();
+ if (type.getName().equals("UNION".toLowerCase())){
+ unionFields.add(field+UNION_COL_SUFIX);
+ }
+ }
+
+ String[] arr = unionFields.toArray(new String[unionFields.size()]);
+ String[] both = (String[]) ArrayUtils.addAll(fields, arr);
+
+ query.setFields(both);
query.setLimit(1);
Result<K,T> result = execute(query);
boolean hasResult = false;
@@ -339,17 +383,21 @@ public class CassandraStore<K, T extends
* <li>Create a new duplicate instance of the object (explained in more
detail below) **.</li>
* <li>Obtain a {@link java.util.List} of the {@link org.apache.avro.Schema}
* {@link org.apache.avro.Schema.Field}'s.</li>
- * <li>Iterate through the {@link java.util.List}. This allows us to process
- * each item appropriately.</li>
- * <li>Check to see if the {@link org.apache.avro.Schema.Field} is either at
- * position 0 OR it is NOT dirty. If one of these conditions is true then we
DO NOT
- * process this field.</li>
- * <li>Obtain the element at the specified position in this list.</li>
+ * <li>Iterate through the field {@link java.util.List}. This allows us to
+ * consequently process each item.</li>
+ * <li>Check to see if the {@link org.apache.avro.Schema.Field} is NOT
dirty.
+ * If this condition is true then we DO NOT process this field.</li>
+ * <li>Obtain the element at the specified position in this list so we can
+ * directly operate on it.</li>
* <li>Obtain the {@link org.apache.avro.Schema.Type} of the element
obtained
- * above and process it accordingly. N.B. For nested type RECORD we shadow
- * the checks to see if the {@link org.apache.avro.Schema.Field} is either
at
+ * above and process it accordingly. N.B. For nested type ARRAY, MAP
+ * RECORD or UNION, we shadow the checks in bullet point 5 above to infer
that the
+ * {@link org.apache.avro.Schema.Field} is either at
* position 0 OR it is NOT dirty. If one of these conditions is true then we
DO NOT
- * process this field.</li>
+ * process this field. This is carried out in
+ * {@link
org.apache.gora.cassandra.store.CassandraStore#getFieldValue(Schema, Type,
Object)}</li>
+ * <li>We then insert the Key and Object into the {@link
java.util.LinkedHashMap} buffer
+ * before being flushed. This performs a structural modification of the
map.</li>
* </ol>
* ** We create a duplicate instance of the object to be persisted and
insert processed
* objects into a synchronized {@link java.util.LinkedHashMap}. This allows
@@ -372,37 +420,9 @@ public class CassandraStore<K, T extends
Field field = fields.get(i);
Type type = field.schema().getType();
Object fieldValue = value.get(field.pos());
+ Schema fieldSchema = field.schema();
// check if field has a nested structure (array, map, record or union)
-
- switch(type) {
- case RECORD:
- Persistent persistent = (Persistent) fieldValue;
- Persistent newRecord = (Persistent)
SpecificData.get().newRecord(persistent, persistent.getSchema());
- for (Field member: field.schema().getFields()) {
- if (member.pos() == 0 || !persistent.isDirty()) {
- continue;
- }
- newRecord.put(member.pos(), persistent.get(member.pos()));
- }
- fieldValue = newRecord;
- break;
- case MAP:
- Map<?, ?> map = (Map<?, ?>) fieldValue;
- fieldValue = map;
- break;
- case ARRAY:
- fieldValue = (List<?>) fieldValue;
- break;
- case UNION:
- // storing the union selected schema, the actual value will
- // be stored as soon as we get break out.
- int schemaPos = getUnionSchema(fieldValue,field.schema());
- p.put( schemaPos, p.getSchema().getField(field.name() +
CassandraStore.UNION_COL_SUFIX));
- //p.put(fieldPos, fieldValue);
- break;
- default:
- break;
- }
+ fieldValue = getFieldValue(fieldSchema, type, fieldValue);
p.put(field.pos(), fieldValue);
}
// this performs a structural modification of the map
@@ -410,14 +430,64 @@ public class CassandraStore<K, T extends
}
/**
+ * For every field within an object, we pass in a field schema, Type and
value.
+ * This enables us to process fields (based on their characteristics)
+ * preparing them for persistence.
+ * @param fieldSchema the associated field schema
+ * @param type the field type
+ * @param fieldValue the field value.
+ * @return
+ */
+ private Object getFieldValue(Schema fieldSchema, Type type, Object
fieldValue ){
+ switch(type) {
+ case RECORD:
+ Persistent persistent = (Persistent) fieldValue;
+ Persistent newRecord = (Persistent)
SpecificData.get().newRecord(persistent, persistent.getSchema());
+ for (Field member: fieldSchema.getFields()) {
+ if (member.pos() == 0 || !persistent.isDirty()) {
+ continue;
+ }
+ Schema memberSchema = member.schema();
+ Type memberType = memberSchema.getType();
+ Object memberValue = persistent.get(member.pos());
+ newRecord.put(member.pos(), getFieldValue(memberSchema, memberType,
memberValue));
+ }
+ fieldValue = newRecord;
+ break;
+ case MAP:
+ Map<?, ?> map = (Map<?, ?>) fieldValue;
+ fieldValue = map;
+ break;
+ case ARRAY:
+ fieldValue = (List<?>) fieldValue;
+ break;
+ case UNION:
+ // storing the union selected schema, the actual value will
+ // be stored as soon as we get break out.
+ if (fieldValue != null){
+ int schemaPos = getUnionSchema(fieldValue,fieldSchema);
+ Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+ Type unionType = unionSchema.getType();
+ fieldValue = getFieldValue(unionSchema, unionType, fieldValue);
+ }
+ //p.put( schemaPos, p.getSchema().getField(field.name() +
CassandraStore.UNION_COL_SUFIX));
+ //p.put(fieldPos, fieldValue);
+ break;
+ default:
+ break;
+ }
+ return fieldValue;
+ }
+
+ /**
* Add a field to Cassandra according to its type.
* @param key the key of the row where the field should be added
* @param field the Avro field representing a datum
* @param value the field value
*/
- @SuppressWarnings({ "unchecked", "null" })
- private void addOrUpdateField(K key, Field field, Object value) {
- Schema schema = field.schema();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void addOrUpdateField(K key, Field field, Schema schema, Object
value) {
+ //Schema schema = field.schema();
Type type = schema.getType();
// checking if the value to be updated is used for saving union schema
if (field.name().indexOf(CassandraStore.UNION_COL_SUFIX) < 0){
@@ -436,22 +506,74 @@ public class CassandraStore<K, T extends
if (value != null) {
if (value instanceof PersistentBase) {
PersistentBase persistentBase = (PersistentBase) value;
- for (Field member: schema.getFields()) {
-
- // TODO: hack, do not store empty arrays
- Object memberValue = persistentBase.get(member.pos());
- if (memberValue instanceof List<?>) {
- if (((List<?>)memberValue).size() == 0) {
- continue;
- }
- } else if (memberValue instanceof Map<?,?>) {
- if (((Map<?, ?>)memberValue).size() == 0) {
- continue;
- }
- }
- this.cassandraClient.addSubColumn(key, field.name(),
- member.name(), memberValue);
+
+ SpecificDatumWriter writer = (SpecificDatumWriter<?>)
writerMap.get(schema.getFullName());
+ if (writer == null) {
+ writer = new SpecificDatumWriter(schema);// ignore dirty bits
+ writerMap.put(schema.getFullName(),writer);
+ }
+
+ BinaryEncoder encoderFromCache = encoders.get();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ outputStream.set(bos);
+ BinaryEncoder encoder =
EncoderFactory.get().directBinaryEncoder(bos, null);
+ if (encoderFromCache == null) {
+ encoders.set(encoder);
}
+
+ //reset the buffers
+ ByteArrayOutputStream os = outputStream.get();
+ os.reset();
+
+ try {
+ writer.write(persistentBase, encoder);
+ encoder.flush();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ byte[] byteValue = os.toByteArray();
+
+ //String familyName =
this.cassandraClient.getCassandraMapping().getFamily(field.name());
+// if (this.cassandraClient.isSuper( familyName )){
+// this.cassandraClient.addSubColumn(key, columnName, columnName,
schemaPos);
+// }else{
+//
+//
+// }
+ this.cassandraClient.addColumn(key, field.name(), byteValue);
+
+// for (Field member: schema.getFields()) {
+// if (member.pos() == 0) {
+// continue;
+// }
+// // TODO: hack, do not store empty arrays
+// Object memberValue = persistentBase.get(member.pos());
+// if (memberValue instanceof List<?>) {
+// if (((List<?>)memberValue).size() == 0) {
+// continue;
+// }
+// } else if (memberValue instanceof Map<?,?>) {
+// if (((Map<?, ?>)memberValue).size() == 0) {
+// continue;
+// }
+// }
+// if (memberValue == null){
+// continue;
+// }
+//
+// // Get type for Union Fields
+// Schema memberSchema = member.schema();
+// Type fieldType = memberSchema.getType();
+// if (fieldType.equals(Type.UNION)){
+// int schemaPos = getUnionSchema(memberValue, memberSchema);
+// this.cassandraClient.addSubColumn(key, field.name(),
+// member.name()+UNION_COL_SUFIX, schemaPos);
+// }
+//
+// this.cassandraClient.addSubColumn(key, field.name(),
+// member.name(), memberValue);
+// }
} else {
LOG.warn("Record with value: " + value.toString() + " not
supported for field: " + field.name());
}
@@ -468,8 +590,13 @@ public class CassandraStore<K, T extends
break;
case ARRAY:
if (value != null) {
- if (value instanceof GenericArray<?>) {
- this.cassandraClient.addGenericArray(key, field.name(),
(GenericArray<?>)value);
+ if (value instanceof DirtyListWrapper<?>) {
+ DirtyListWrapper fieldValue = (DirtyListWrapper<?>)value;
+ GenericArray valueArray = new Array(fieldValue.size(), schema);
+ for (int i = 0; i < fieldValue.size(); i++) {
+ valueArray.add(i, fieldValue.get(i));
+ }
+ this.cassandraClient.addGenericArray(key, field.name(),
(GenericArray<?>)valueArray);
} else {
LOG.warn("Array with value: " + value.toString() + " not supported
for field: " + field.name());
}
@@ -477,14 +604,23 @@ public class CassandraStore<K, T extends
break;
case UNION:
if(value != null) {
- LOG.debug("Union with value: " + value.toString() + " at index: " +
getUnionSchema(value, schema) + " supported for field: " + field.name());
+ int schemaPos = getUnionSchema(value, schema);
+ LOG.debug("Union with value: " + value.toString() + " at index: " +
schemaPos + " supported for field: " + field.name());
// adding union schema index
String columnName = field.name() + UNION_COL_SUFIX;
String familyName =
this.cassandraClient.getCassandraMapping().getFamily(field.name());
this.cassandraClient.getCassandraMapping().addColumn(familyName,
columnName, columnName);
- this.cassandraClient.addColumn(key, columnName,
getUnionSchema(value, schema));
+ if (this.cassandraClient.isSuper( familyName )){
+ this.cassandraClient.addSubColumn(key, columnName, columnName,
schemaPos);
+ }else{
+ this.cassandraClient.addColumn(key, columnName, schemaPos);
+
+ }
+// this.cassandraClient.getCassandraMapping().addColumn(familyName,
columnName, columnName);
// adding union value
- this.cassandraClient.addColumn(key, field.name(), value);
+ Schema unioSchema = schema.getTypes().get(schemaPos);
+ addOrUpdateField(key, field, unioSchema, value);
+ //this.cassandraClient.addColumn(key, field.name(), value);
} else {
LOG.warn("Union with 'null' value not supported for field: " +
field.name());
}
@@ -506,24 +642,30 @@ public class CassandraStore<K, T extends
*/
private int getUnionSchema(Object pValue, Schema pUnionSchema){
int unionSchemaPos = 0;
- String valueType = pValue.getClass().getSimpleName();
+// String valueType = pValue.getClass().getSimpleName();
Iterator<Schema> it = pUnionSchema.getTypes().iterator();
while ( it.hasNext() ){
- String schemaName = it.next().getName();
- if (valueType.equals("Utf8") &&
schemaName.equals(Type.STRING.name().toLowerCase()))
+ Type schemaType = it.next().getType();
+ if (pValue instanceof Utf8 && schemaType.equals(Type.STRING))
return unionSchemaPos;
- else if (valueType.equals("HeapByteBuffer") &&
schemaName.equals(Type.STRING.name().toLowerCase()))
+ else if (pValue instanceof ByteBuffer && schemaType.equals(Type.BYTES))
return unionSchemaPos;
- else if (valueType.equals("Integer") &&
schemaName.equals(Type.INT.name().toLowerCase()))
+ else if (pValue instanceof Integer && schemaType.equals(Type.INT))
return unionSchemaPos;
- else if (valueType.equals("Long") &&
schemaName.equals(Type.LONG.name().toLowerCase()))
+ else if (pValue instanceof Long && schemaType.equals(Type.LONG))
return unionSchemaPos;
- else if (valueType.equals("Double") &&
schemaName.equals(Type.DOUBLE.name().toLowerCase()))
+ else if (pValue instanceof Double && schemaType.equals(Type.DOUBLE))
return unionSchemaPos;
- else if (valueType.equals("Float") &&
schemaName.equals(Type.FLOAT.name().toLowerCase()))
+ else if (pValue instanceof Float && schemaType.equals(Type.FLOAT))
return unionSchemaPos;
- else if (valueType.equals("Boolean") &&
schemaName.equals(Type.BOOLEAN.name().toLowerCase()))
+ else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN))
return unionSchemaPos;
+ else if (pValue instanceof Map && schemaType.equals(Type.MAP))
+ return unionSchemaPos;
+ else if (pValue instanceof List && schemaType.equals(Type.ARRAY))
+ return unionSchemaPos;
+ else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD))
+ return unionSchemaPos;
unionSchemaPos ++;
}
// if we weren't able to determine which data type it is, then we return
the default
Modified:
gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/test/conf/gora-cassandra-mapping.xml
Thu Feb 13 20:19:22 2014
@@ -47,7 +47,7 @@
<field name="content" family="p" qualifier="p:cnt:c"/>
<field name="parsedContent" family="sc" qualifier="p:parsedContent"/>
<field name="outlinks" family="sc" qualifier="p:outlinks"/>
- <field name="metadata" family="sc" qualifier="c:mt"/>
+ <field name="metadata" family="p" qualifier="c:mt"/>
</class>
<class name="org.apache.gora.examples.generated.TokenDatum"
keyClass="java.lang.String" keyspace="TokenDatum">
Modified:
gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
URL:
http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java?rev=1568028&r1=1568027&r2=1568028&view=diff
==============================================================================
---
gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
(original)
+++
gora/branches/GORA_94/gora-cassandra/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java
Thu Feb 13 20:19:22 2014
@@ -68,8 +68,6 @@ public class TestCassandraStore extends
}
- //We need to skip the following tests for a while until we fix some issues..
-
@Ignore("skipped until some bugs are fixed")
@Override
public void testGetWebPageDefaultFields() throws IOException {}
@@ -88,16 +86,16 @@ public class TestCassandraStore extends
@Ignore("skipped until some bugs are fixed")
@Override
public void testQueryWebPageSingleKeyDefaultFields() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented
at CassandraStore, and always returns false or 0")
@Override
public void testDelete() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented
at CassandraStore, and always returns false or 0")
@Override
public void testDeleteByQuery() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented
at CassandraStore, and always returns false or 0")
@Override
public void testDeleteByQueryFields() throws IOException {}
- @Ignore("skipped until some bugs are fixed")
+ @Ignore("GORA-298 Implement CassandraStore#getPartitions")
@Override
public void testGetPartitions() throws IOException {}
@Ignore("skipped until some bugs are fixed")