Author: lewismc
Date: Mon May 21 22:41:20 2012
New Revision: 1341239
URL: http://svn.apache.org/viewvc?rev=1341239&view=rev
Log:
commit to address GORA-131 & 132 respectively and update to CHANGES.txt
Modified:
gora/trunk/CHANGES.txt
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
gora/trunk/gora-tutorial/conf/gora.properties
gora/trunk/gora-tutorial/pom.xml
Modified: gora/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Mon May 21 22:41:20 2012
@@ -6,6 +6,10 @@ Gora Change Log
0.3 (trunk) Current Development:
+* GORA-131 gora-cassandra should support other key types than String (Kazuomi
Kashii via lewismc)
+
+* GORA-132 Uses ByteBufferSerializer for column value to support various data
types rather than StringSerializer (Kazuomi Kashii via lewismc)
+
* GORA-77 Replace commons logging with Log4j (Renato Javier MarroquÃn
Mogrovejo via lewismc)
* GORA-134 ListGenericArray's hashCode causes StackOverflowError (Kazuomi
Kashii via lewismc)
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResult.java
Mon May 21 22:41:20 2012
@@ -36,7 +36,7 @@ public class CassandraResult<K, T extend
private int rowNumber;
- private CassandraResultSet cassandraResultSet;
+ private CassandraResultSet<K> cassandraResultSet;
/**
* Maps Cassandra columns to Avro fields.
@@ -63,10 +63,10 @@ public class CassandraResult<K, T extend
*/
@SuppressWarnings("unchecked")
private void updatePersistent() throws IOException {
- CassandraRow cassandraRow = this.cassandraResultSet.get(this.rowNumber);
+ CassandraRow<K> cassandraRow = this.cassandraResultSet.get(this.rowNumber);
// load key
- this.key = (K) cassandraRow.getKey();
+ this.key = cassandraRow.getKey();
// load value
Schema schema = this.persistent.getSchema();
@@ -104,7 +104,7 @@ public class CassandraResult<K, T extend
return (((float) this.rowNumber) / this.cassandraResultSet.size());
}
- public void setResultSet(CassandraResultSet cassandraResultSet) {
+ public void setResultSet(CassandraResultSet<K> cassandraResultSet) {
this.cassandraResultSet = cassandraResultSet;
}
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
Mon May 21 22:41:20 2012
@@ -24,7 +24,7 @@ import java.util.HashMap;
/**
* List data structure to keep the order coming from the Cassandra selects.
*/
-public class CassandraResultSet extends ArrayList<CassandraRow> {
+public class CassandraResultSet<K> extends ArrayList<CassandraRow<K>> {
/**
*
@@ -34,9 +34,9 @@ public class CassandraResultSet extends
/**
* Maps keys to indices in the list.
*/
- private HashMap<String, Integer> indexMap = new HashMap<String, Integer>();
+ private HashMap<K, Integer> indexMap = new HashMap<K, Integer>();
- public CassandraRow getRow(String key) {
+ public CassandraRow<K> getRow(K key) {
Integer integer = this.indexMap.get(key);
if (integer == null) {
return null;
@@ -45,7 +45,7 @@ public class CassandraResultSet extends
return this.get(integer);
}
- public void putRow(String key, CassandraRow cassandraRow) {
+ public void putRow(K key, CassandraRow<K> cassandraRow) {
this.add(cassandraRow);
this.indexMap.put(key, this.size()-1);
}
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java
Mon May 21 22:41:20 2012
@@ -23,19 +23,19 @@ import java.util.ArrayList;
/**
* List of key value pairs representing a row, tagged by a key.
*/
-public class CassandraRow extends ArrayList<CassandraColumn> {
+public class CassandraRow<K> extends ArrayList<CassandraColumn> {
/**
*
*/
private static final long serialVersionUID = -7620939600192859652L;
- private String key;
+ private K key;
- public String getKey() {
+ public K getKey() {
return this.key;
}
- public void setKey(String key) {
+ public void setKey(K key) {
this.key = key;
}
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSubColumn.java
Mon May 21 22:41:20 2012
@@ -24,6 +24,11 @@ import java.nio.charset.CharacterCodingE
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
+import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.DoubleSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.HColumn;
import org.apache.avro.Schema;
@@ -46,7 +51,7 @@ public class CassandraSubColumn extends
/**
* Key-value pair containing the raw data.
*/
- private HColumn<String, String> hColumn;
+ private HColumn<String, ByteBuffer> hColumn;
public String getName() {
return hColumn.getName();
@@ -60,28 +65,31 @@ public class CassandraSubColumn extends
Field field = getField();
Schema fieldSchema = field.schema();
Type type = fieldSchema.getType();
- String valueString = hColumn.getValue();
+ ByteBuffer valueByteBuffer = hColumn.getValue();
Object value = null;
switch (type) {
case STRING:
- value = new Utf8(valueString);
+ value = new
Utf8(StringSerializer.get().fromByteBuffer(valueByteBuffer));
break;
case BYTES:
- // convert string to bytebuffer
- value = getByteBuffer(valueString);
+ value = valueByteBuffer;
break;
case INT:
- value = Integer.parseInt(valueString);
+ value = IntegerSerializer.get().fromByteBuffer(valueByteBuffer);
break;
case LONG:
- value = Long.parseLong(valueString);
+ value = LongSerializer.get().fromByteBuffer(valueByteBuffer);
break;
case FLOAT:
- value = Float.parseFloat(valueString);
+ value = FloatSerializer.get().fromByteBuffer(valueByteBuffer);
+ break;
+ case DOUBLE:
+ value = DoubleSerializer.get().fromByteBuffer(valueByteBuffer);
break;
case ARRAY:
// convert string to array
+ String valueString =
StringSerializer.get().fromByteBuffer(valueByteBuffer);
valueString = valueString.substring(1, valueString.length()-1);
String[] elements = valueString.split(", ");
@@ -106,17 +114,7 @@ public class CassandraSubColumn extends
}
- public void setValue(HColumn<String, String> hColumn) {
+ public void setValue(HColumn<String, ByteBuffer> hColumn) {
this.hColumn = hColumn;
}
-
- public static ByteBuffer getByteBuffer(String valueString) {
- ByteBuffer byteBuffer = null;
- try {
- byteBuffer = charsetEncoder.encode(CharBuffer.wrap(valueString));
- } catch (CharacterCodingException cce) {
- LOG.warn("Unable to encode " + valueString + " into " + ENCODING);
- }
- return byteBuffer;
- }
}
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraSuperColumn.java
Mon May 21 22:41:20 2012
@@ -18,8 +18,14 @@
package org.apache.gora.cassandra.query;
+import java.nio.ByteBuffer;
import java.util.Map;
+import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.DoubleSerializer;
+import me.prettyprint.cassandra.serializers.IntegerSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.HSuperColumn;
@@ -35,7 +41,7 @@ import org.slf4j.LoggerFactory;
public class CassandraSuperColumn extends CassandraColumn {
public static final Logger LOG =
LoggerFactory.getLogger(CassandraSuperColumn.class);
- private HSuperColumn<String, String, String> hSuperColumn;
+ private HSuperColumn<String, String, ByteBuffer> hSuperColumn;
public String getName() {
return hSuperColumn.getName();
@@ -53,15 +59,27 @@ public class CassandraSuperColumn extend
Map<Utf8, Object> map = new StatefulHashMap<Utf8, Object>();
Type valueType = fieldSchema.getValueType().getType();
- for (HColumn<String, String> hColumn : this.hSuperColumn.getColumns())
{
- String memberString = hColumn.getValue();
+ for (HColumn<String, ByteBuffer> hColumn :
this.hSuperColumn.getColumns()) {
+ ByteBuffer memberByteBuffer = hColumn.getValue();
Object memberValue = null;
switch (valueType) {
case STRING:
- memberValue = new Utf8(memberString);
+ memberValue = new
Utf8(StringSerializer.get().fromByteBuffer(memberByteBuffer));
break;
case BYTES:
- memberValue = CassandraSubColumn.getByteBuffer(memberString);
+ memberValue = memberByteBuffer;
+ break;
+ case INT:
+ memberValue =
IntegerSerializer.get().fromByteBuffer(memberByteBuffer);
+ break;
+ case LONG:
+ memberValue =
LongSerializer.get().fromByteBuffer(memberByteBuffer);
+ break;
+ case FLOAT:
+ memberValue =
FloatSerializer.get().fromByteBuffer(memberByteBuffer);
+ break;
+ case DOUBLE:
+ memberValue =
DoubleSerializer.get().fromByteBuffer(memberByteBuffer);
break;
default:
LOG.info("Type for the map value is not supported: " +
valueType);
@@ -97,7 +115,7 @@ public class CassandraSuperColumn extend
if (value instanceof PersistentBase) {
PersistentBase record = (PersistentBase) value;
- for (HColumn<String, String> hColumn :
this.hSuperColumn.getColumns()) {
+ for (HColumn<String, ByteBuffer> hColumn :
this.hSuperColumn.getColumns()) {
Field memberField = fieldSchema.getField(hColumn.getName());
CassandraSubColumn cassandraColumn = new CassandraSubColumn();
cassandraColumn.setField(memberField);
@@ -113,7 +131,7 @@ public class CassandraSuperColumn extend
return value;
}
- public void setValue(HSuperColumn<String, String, String> hSuperColumn) {
+ public void setValue(HSuperColumn<String, String, ByteBuffer> hSuperColumn) {
this.hSuperColumn = hSuperColumn;
}
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
Mon May 21 22:41:20 2012
@@ -25,7 +25,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.FloatSerializer;
+import me.prettyprint.cassandra.serializers.DoubleSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.SerializerTypeInferer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
@@ -42,7 +46,9 @@ import me.prettyprint.hector.api.query.R
import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.hector.api.HConsistencyLevel;
+import me.prettyprint.hector.api.Serializer;
+import org.apache.avro.util.Utf8;
import org.apache.gora.cassandra.query.CassandraQuery;
import org.apache.gora.mapreduce.GoraRecordReader;
import org.apache.gora.persistency.Persistent;
@@ -56,13 +62,17 @@ public class CassandraClient<K, T extend
private Cluster cluster;
private Keyspace keyspace;
- private Mutator<String> mutator;
+ private Mutator<K> mutator;
+ private Class<K> keyClass;
private CassandraMapping cassandraMapping = new CassandraMapping();
private StringSerializer stringSerializer = new StringSerializer();
+ private ByteBufferSerializer byteBufferSerializer = new
ByteBufferSerializer();
+ private Serializer<K> keySerializer;
- public void initialize() throws Exception {
+ public void initialize(Class<K> keyClass) throws Exception {
+ this.keyClass = keyClass;
this.cassandraMapping.loadConfiguration();
this.cluster =
HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), new
CassandraHostConfigurator(this.cassandraMapping.getHostName()));
@@ -72,7 +82,8 @@ public class CassandraClient<K, T extend
// Just create a Keyspace object on the client side, corresponding to an
already existing keyspace with already created column families.
this.keyspace =
HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
- this.mutator = HFactory.createMutator(this.keyspace,
this.stringSerializer);
+ this.keySerializer = SerializerTypeInferer.getSerializer(keyClass);
+ this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer);
}
/**
@@ -125,28 +136,36 @@ public class CassandraClient<K, T extend
* @param fieldName the field name
* @param value the field value.
*/
- public void addColumn(String key, String fieldName, Object value) {
+ public void addColumn(K key, String fieldName, Object value) {
if (value == null) {
return;
}
+
+ ByteBuffer byteBuffer = null;
if (value instanceof ByteBuffer) {
- value = toString((ByteBuffer) value);
+ byteBuffer = (ByteBuffer) value;
+ }
+ else if (value instanceof Utf8) {
+ byteBuffer = stringSerializer.toByteBuffer(((Utf8)value).toString());
+ }
+ else if (value instanceof Float) {
+ // workaround for hector-core-1.0-1.jar
+ // because SerializerTypeInferer.getSerializer(Float ) returns
ObjectSerializer !?
+ byteBuffer = FloatSerializer.get().toByteBuffer((Float)value);
+ }
+ else if (value instanceof Double) {
+ // workaround for hector-core-1.0-1.jar
+ // because SerializerTypeInferer.getSerializer(Double ) returns
ObjectSerializer !?
+ byteBuffer = DoubleSerializer.get().toByteBuffer((Double)value);
+ }
+ else {
+ byteBuffer =
SerializerTypeInferer.getSerializer(value).toByteBuffer(value);
}
String columnFamily = this.cassandraMapping.getFamily(fieldName);
String columnName = this.cassandraMapping.getColumn(fieldName);
- this.mutator.insert(key, columnFamily,
HFactory.createStringColumn(columnName, value.toString()));
- }
-
- /**
- * TODO do no convert bytes to string to store a binary field
- * @param value
- * @return
- */
- private static String toString(ByteBuffer value) {
- ByteBuffer byteBuffer = (ByteBuffer) value;
- return ByteUtils.toString(byteBuffer.array(), 0, byteBuffer.limit());
+ this.mutator.insert(key, columnFamily, HFactory.createColumn(columnName,
byteBuffer, stringSerializer, byteBufferSerializer));
}
/**
@@ -157,19 +176,36 @@ public class CassandraClient<K, T extend
* @param value the member value
*/
@SuppressWarnings("unchecked")
-public void addSubColumn(String key, String fieldName, String memberName,
Object value) {
+public void addSubColumn(K key, String fieldName, String memberName, Object
value) {
if (value == null) {
return;
}
+ ByteBuffer byteBuffer = null;
if (value instanceof ByteBuffer) {
- value = toString((ByteBuffer) value);
+ byteBuffer = (ByteBuffer) value;
+ }
+ else if (value instanceof Utf8) {
+ byteBuffer = stringSerializer.toByteBuffer(((Utf8)value).toString());
+ }
+ else if (value instanceof Float) {
+ // workaround for hector-core-1.0-1.jar
+ // because SerializerTypeInferer.getSerializer(Float ) returns
ObjectSerializer !?
+ byteBuffer = FloatSerializer.get().toByteBuffer((Float)value);
+ }
+ else if (value instanceof Double) {
+ // workaround for hector-core-1.0-1.jar
+ // because SerializerTypeInferer.getSerializer(Double ) returns
ObjectSerializer !?
+ byteBuffer = DoubleSerializer.get().toByteBuffer((Double)value);
+ }
+ else {
+ byteBuffer =
SerializerTypeInferer.getSerializer(value).toByteBuffer(value);
}
String columnFamily = this.cassandraMapping.getFamily(fieldName);
String superColumnName = this.cassandraMapping.getColumn(fieldName);
- this.mutator.insert(key, columnFamily,
HFactory.createSuperColumn(superColumnName,
Arrays.asList(HFactory.createStringColumn(memberName, value.toString())),
this.stringSerializer, this.stringSerializer, this.stringSerializer));
+ this.mutator.insert(key, columnFamily,
HFactory.createSuperColumn(superColumnName,
Arrays.asList(HFactory.createColumn(memberName, byteBuffer, stringSerializer,
byteBufferSerializer)), this.stringSerializer, this.stringSerializer,
this.byteBufferSerializer));
}
@@ -179,23 +215,18 @@ public void addSubColumn(String key, Str
* @param family the family name to be queried
* @return a list of family rows
*/
- public List<Row<String, String, String>> execute(CassandraQuery<K, T>
cassandraQuery, String family) {
+ public List<Row<K, String, ByteBuffer>> execute(CassandraQuery<K, T>
cassandraQuery, String family) {
String[] columnNames = cassandraQuery.getColumns(family);
Query<K, T> query = cassandraQuery.getQuery();
int limit = (int) query.getLimit();
- String startKey = (String) query.getStartKey();
- String endKey = (String) query.getEndKey();
-
- if (startKey == null) {
- startKey = "";
- }
- if (endKey == null) {
- endKey = "";
+ if (limit < 1) {
+ limit = Integer.MAX_VALUE;
}
+ K startKey = query.getStartKey();
+ K endKey = query.getEndKey();
-
- RangeSlicesQuery<String, String, String> rangeSlicesQuery =
HFactory.createRangeSlicesQuery(this.keyspace, this.stringSerializer,
stringSerializer, stringSerializer);
+ RangeSlicesQuery<K, String, ByteBuffer> rangeSlicesQuery =
HFactory.createRangeSlicesQuery(this.keyspace, this.keySerializer,
stringSerializer, byteBufferSerializer);
rangeSlicesQuery.setColumnFamily(family);
rangeSlicesQuery.setKeys(startKey, endKey);
rangeSlicesQuery.setRange("", "", false,
GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
@@ -203,8 +234,8 @@ public void addSubColumn(String key, Str
rangeSlicesQuery.setColumnNames(columnNames);
- QueryResult<OrderedRows<String, String, String>> queryResult =
rangeSlicesQuery.execute();
- OrderedRows<String, String, String> orderedRows = queryResult.get();
+ QueryResult<OrderedRows<K, String, ByteBuffer>> queryResult =
rangeSlicesQuery.execute();
+ OrderedRows<K, String, ByteBuffer> orderedRows = queryResult.get();
return orderedRows.getList();
@@ -259,22 +290,17 @@ public void addSubColumn(String key, Str
return this.cassandraMapping.isSuper(family);
}
- public List<SuperRow<String, String, String, String>>
executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
+ public List<SuperRow<K, String, String, ByteBuffer>>
executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
String[] columnNames = cassandraQuery.getColumns(family);
Query<K, T> query = cassandraQuery.getQuery();
int limit = (int) query.getLimit();
- String startKey = (String) query.getStartKey();
- String endKey = (String) query.getEndKey();
-
- if (startKey == null) {
- startKey = "";
+ if (limit < 1) {
+ limit = Integer.MAX_VALUE;
}
- if (endKey == null) {
- endKey = "";
- }
-
+ K startKey = query.getStartKey();
+ K endKey = query.getEndKey();
- RangeSuperSlicesQuery<String, String, String, String>
rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace,
this.stringSerializer, this.stringSerializer, this.stringSerializer,
this.stringSerializer);
+ RangeSuperSlicesQuery<K, String, String, ByteBuffer> rangeSuperSlicesQuery
= HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer,
this.stringSerializer, this.stringSerializer, this.byteBufferSerializer);
rangeSuperSlicesQuery.setColumnFamily(family);
rangeSuperSlicesQuery.setKeys(startKey, endKey);
rangeSuperSlicesQuery.setRange("", "", false,
GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
@@ -282,8 +308,8 @@ public void addSubColumn(String key, Str
rangeSuperSlicesQuery.setColumnNames(columnNames);
- QueryResult<OrderedSuperRows<String, String, String, String>> queryResult
= rangeSuperSlicesQuery.execute();
- OrderedSuperRows<String, String, String, String> orderedRows =
queryResult.get();
+ QueryResult<OrderedSuperRows<K, String, String, ByteBuffer>> queryResult =
rangeSuperSlicesQuery.execute();
+ OrderedSuperRows<K, String, String, ByteBuffer> orderedRows =
queryResult.get();
return orderedRows.getList();
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
Mon May 21 22:41:20 2012
@@ -19,10 +19,12 @@
package org.apache.gora.cassandra.store;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import me.prettyprint.hector.api.beans.ColumnSlice;
@@ -69,7 +71,17 @@ public class CassandraStore<K, T extends
private Map<K, T> buffer = new LinkedHashMap<K, T>();
public CassandraStore() throws Exception {
- this.cassandraClient.initialize();
+ // this.cassandraClient.initialize();
+ }
+
+ public void initialize(Class<K> keyClass, Class<T> persistent, Properties
properties) throws IOException {
+ super.initialize(keyClass, persistent, properties);
+ try {
+ this.cassandraClient.initialize(keyClass);
+ }
+ catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
}
@Override
@@ -139,22 +151,22 @@ public class CassandraStore<K, T extends
private void addSubColumns(String family, CassandraQuery<K, T>
cassandraQuery,
CassandraResultSet cassandraResultSet) {
// select family columns that are included in the query
- List<Row<String, String, String>> rows =
this.cassandraClient.execute(cassandraQuery, family);
+ List<Row<K, String, ByteBuffer>> rows =
this.cassandraClient.execute(cassandraQuery, family);
- for (Row<String, String, String> row : rows) {
- String key = row.getKey();
+ for (Row<K, String, ByteBuffer> row : rows) {
+ K key = row.getKey();
// find associated row in the resultset
- CassandraRow cassandraRow = cassandraResultSet.getRow(key);
+ CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
if (cassandraRow == null) {
- cassandraRow = new CassandraRow();
+ cassandraRow = new CassandraRow<K>();
cassandraResultSet.putRow(key, cassandraRow);
cassandraRow.setKey(key);
}
- ColumnSlice<String, String> columnSlice = row.getColumnSlice();
+ ColumnSlice<String, ByteBuffer> columnSlice = row.getColumnSlice();
- for (HColumn<String, String> hColumn : columnSlice.getColumns()) {
+ for (HColumn<String, ByteBuffer> hColumn : columnSlice.getColumns()) {
CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
cassandraSubColumn.setValue(hColumn);
cassandraSubColumn.setFamily(family);
@@ -167,18 +179,18 @@ public class CassandraStore<K, T extends
private void addSuperColumns(String family, CassandraQuery<K, T>
cassandraQuery,
CassandraResultSet cassandraResultSet) {
- List<SuperRow<String, String, String, String>> superRows =
this.cassandraClient.executeSuper(cassandraQuery, family);
- for (SuperRow<String, String, String, String> superRow: superRows) {
- String key = superRow.getKey();
- CassandraRow cassandraRow = cassandraResultSet.getRow(key);
+ List<SuperRow<K, String, String, ByteBuffer>> superRows =
this.cassandraClient.executeSuper(cassandraQuery, family);
+ for (SuperRow<K, String, String, ByteBuffer> superRow: superRows) {
+ K key = superRow.getKey();
+ CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
if (cassandraRow == null) {
cassandraRow = new CassandraRow();
cassandraResultSet.putRow(key, cassandraRow);
cassandraRow.setKey(key);
}
- SuperSlice<String, String, String> superSlice = superRow.getSuperSlice();
- for (HSuperColumn<String, String, String> hSuperColumn:
superSlice.getSuperColumns()) {
+ SuperSlice<String, String, ByteBuffer> superSlice =
superRow.getSuperSlice();
+ for (HSuperColumn<String, String, ByteBuffer> hSuperColumn:
superSlice.getSuperColumns()) {
CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
cassandraSuperColumn.setValue(hSuperColumn);
cassandraSuperColumn.setFamily(family);
@@ -209,7 +221,7 @@ public class CassandraStore<K, T extends
Schema schema = value.getSchema();
for (Field field: schema.getFields()) {
if (value.isDirty(field.pos())) {
- addOrUpdateField((String) key, field, value.get(field.pos()));
+ addOrUpdateField(key, field, value.get(field.pos()));
}
}
}
@@ -222,7 +234,6 @@ public class CassandraStore<K, T extends
@Override
public T get(K key, String[] fields) throws IOException {
- LOG.info("get " + key);
CassandraQuery<K,T> query = new CassandraQuery<K,T>();
query.setDataStore(this);
query.setKeyRange(key, key);
@@ -244,13 +255,14 @@ public class CassandraStore<K, T extends
@Override
public String getSchemaName() {
- LOG.info("get schema name");
return null;
}
@Override
public Query<K, T> newQuery() {
- return new CassandraQuery<K, T>(this);
+ Query<K,T> query = new CassandraQuery<K, T>(this);
+ query.setFields(getFieldsToQuery(null));
+ return query;
}
/**
@@ -298,24 +310,16 @@ public class CassandraStore<K, T extends
* @param field the Avro field representing a datum
* @param value the field value
*/
- private void addOrUpdateField(String key, Field field, Object value) {
+ private void addOrUpdateField(K key, Field field, Object value) {
Schema schema = field.schema();
Type type = schema.getType();
- //LOG.info(field.name() + " " + type.name());
switch (type) {
case STRING:
- this.cassandraClient.addColumn(key, field.name(), value);
- break;
case INT:
- this.cassandraClient.addColumn(key, field.name(), value);
- break;
case LONG:
- this.cassandraClient.addColumn(key, field.name(), value);
- break;
case BYTES:
- this.cassandraClient.addColumn(key, field.name(), value);
- break;
case FLOAT:
+ case DOUBLE:
this.cassandraClient.addColumn(key, field.name(), value);
break;
case RECORD:
@@ -333,6 +337,9 @@ public class CassandraStore<K, T extends
}
}
+ if (memberValue instanceof Utf8) {
+ memberValue = memberValue.toString();
+ }
this.cassandraClient.addSubColumn(key, field.name(),
member.name(), memberValue);
}
} else {
@@ -357,6 +364,9 @@ public class CassandraStore<K, T extends
}
}
+ if (keyValue instanceof Utf8) {
+ keyValue = keyValue.toString();
+ }
this.cassandraClient.addSubColumn(key, field.name(),
mapKey.toString(), keyValue);
}
} else {
Modified: gora/trunk/gora-tutorial/conf/gora.properties
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-tutorial/conf/gora.properties?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-tutorial/conf/gora.properties (original)
+++ gora/trunk/gora-tutorial/conf/gora.properties Mon May 21 22:41:20 2012
@@ -17,10 +17,13 @@
##gora.datastore.default is the default detastore implementation to use
##if it is not passed to the DataStoreFactory#createDataStore() method.
gora.datastore.default=org.apache.gora.hbase.store.HBaseStore
+#gora.datastore.default=org.apache.gora.cassandra.store.CassandraStore
##whether to create schema automatically if not exists.
gora.datastore.autocreateschema=true
+##Cassandra properties for gora-cassandra module using Cassandra
+#gora.cassandrastore.servers=localhost:9160
##JDBC properties for gora-sql module using HSQL
gora.sqlstore.jdbc.driver=org.hsqldb.jdbcDriver
Modified: gora/trunk/gora-tutorial/pom.xml
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-tutorial/pom.xml?rev=1341239&r1=1341238&r2=1341239&view=diff
==============================================================================
--- gora/trunk/gora-tutorial/pom.xml (original)
+++ gora/trunk/gora-tutorial/pom.xml Mon May 21 22:41:20 2012
@@ -126,6 +126,11 @@
<dependency>
<groupId>org.apache.gora</groupId>
+ <artifactId>gora-cassandra</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.gora</groupId>
<artifactId>gora-sql</artifactId>
</dependency>