Author: kazk
Date: Fri Jul 20 07:00:12 2012
New Revision: 1363667
URL: http://svn.apache.org/viewvc?rev=1363667&view=rev
Log:
Fixes GORA-153 to correctly handle DELETED State for MAP
Modified:
gora/trunk/CHANGES.txt
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
gora/trunk/pom.xml
Modified: gora/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1363667&r1=1363666&r2=1363667&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Fri Jul 20 07:00:12 2012
@@ -6,6 +6,8 @@ Gora Change Log
0.3 (trunk) Current Development:
+* GORA-153 gora-cassandra does not correctly handle DELETED State for MAP
(kazk)
+
* GORA-152 gora-core test incorrectly uses ByteBuffer's array() method to get
its byte array (kazk)
* GORA-151 CassandraStore's schemaExists() method always returns false (kazk)
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java?rev=1363667&r1=1363666&r2=1363667&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java
Fri Jul 20 07:00:12 2012
@@ -36,6 +36,7 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.specific.SpecificFixed;
import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.State;
import org.apache.gora.persistency.StatefulHashMap;
import org.slf4j.Logger;
@@ -134,13 +135,16 @@ public class StatefulHashMapSerializer<T
}
private ByteBuffer toByteBufferWithFixedLengthElements(StatefulHashMap<Utf8,
T> map) {
- int n = (int) map.size();
- List<byte[]> list = new ArrayList<byte[]>(n);
- n *= 4;
+ List<byte[]> list = new ArrayList<byte[]>(map.size());
+ int n = 0;
for (Utf8 key : map.keySet()) {
+ if (map.getState(key) == State.DELETED) {
+ continue;
+ }
T value = map.get(key);
byte[] bytes =
BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key));
list.add(bytes);
+ n += 4;
n += bytes.length;
bytes =
BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
list.add(bytes);
@@ -160,16 +164,20 @@ public class StatefulHashMapSerializer<T
}
private ByteBuffer
toByteBufferWithVariableLengthElements(StatefulHashMap<Utf8, T> map) {
- int n = (int) map.size();
- List<byte[]> list = new ArrayList<byte[]>(n);
- n *= 8;
+ List<byte[]> list = new ArrayList<byte[]>(map.size());
+ int n = 0;
for (Utf8 key : map.keySet()) {
+ if (map.getState(key) == State.DELETED) {
+ continue;
+ }
T value = map.get(key);
byte[] bytes =
BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key));
list.add(bytes);
+ n += 4;
n += bytes.length;
bytes =
BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value));
list.add(bytes);
+ n += 4;
n += bytes.length;
}
ByteBuffer byteBuffer = ByteBuffer.allocate(n);
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1363667&r1=1363666&r2=1363667&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
Fri Jul 20 07:00:12 2012
@@ -56,6 +56,7 @@ import org.apache.gora.cassandra.seriali
import org.apache.gora.cassandra.serializers.TypeUtils;
import org.apache.gora.mapreduce.GoraRecordReader;
import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.State;
import org.apache.gora.persistency.StatefulHashMap;
import org.apache.gora.query.Query;
import org.apache.gora.util.ByteUtils;
@@ -198,6 +199,26 @@ public class CassandraClient<K, T extend
}
+ /**
+ * Delete a member in a super column. This is used for map and record Avro
types.
+ * @param key the row key
+ * @param fieldName the field name
+ * @param columnName the column name (the member name, or the index of array)
+ */
+ @SuppressWarnings("unchecked")
+ public void deleteSubColumn(K key, String fieldName, ByteBuffer columnName) {
+
+ String columnFamily = this.cassandraMapping.getFamily(fieldName);
+ String superColumnName = this.cassandraMapping.getColumn(fieldName);
+
+ HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName,
columnName);
+ }
+
+ public void deleteSubColumn(K key, String fieldName, String columnName) {
+ deleteSubColumn(key, fieldName,
StringSerializer.get().toByteBuffer(columnName));
+ }
+
+
@SuppressWarnings("unchecked")
public void addGenericArray(K key, String fieldName, GenericArray array) {
if (isSuper( cassandraMapping.getFamily(fieldName) )) {
@@ -228,6 +249,10 @@ public class CassandraClient<K, T extend
if (isSuper( cassandraMapping.getFamily(fieldName) )) {
int i= 0;
for (Utf8 mapKey: map.keySet()) {
+ if (map.getState(mapKey) == State.DELETED) {
+ deleteSubColumn(key, fieldName, mapKey.toString());
+ continue;
+ }
// TODO: hack, do not store empty arrays
Object mapValue = map.get(mapKey);
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java?rev=1363667&r1=1363666&r2=1363667&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
Fri Jul 20 07:00:12 2012
@@ -295,9 +295,7 @@ public class CassandraStore<K, T extends
fieldValue = newRecord;
break;
case MAP:
- StatefulHashMap<?, ?> map = (StatefulHashMap<?, ?>) fieldValue;
- StatefulHashMap<?, ?> newMap = new StatefulHashMap(map);
- fieldValue = newMap;
+ // needs to keep State.DELETED.
break;
case ARRAY:
GenericArray array = (GenericArray) fieldValue;
Modified:
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
URL:
http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java?rev=1363667&r1=1363666&r2=1363667&view=diff
==============================================================================
---
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
(original)
+++
gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
Fri Jul 20 07:00:12 2012
@@ -74,6 +74,11 @@ public class HectorUtils<K,T extends Per
}
+ public static<K> void deleteSubColumn(Mutator<K> mutator, K key, String
columnFamily, String superColumnName, ByteBuffer columnName) {
+ mutator.subDelete(key, columnFamily, superColumnName, columnName,
StringSerializer.get(), ByteBufferSerializer.get());
+ }
+
+
public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer>
createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer
columnValue) {
return HFactory.createSuperColumn(superColumnName,
Arrays.asList(createColumn(columnName, columnValue)), StringSerializer.get(),
ByteBufferSerializer.get(), ByteBufferSerializer.get());
}
Modified: gora/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/gora/trunk/pom.xml?rev=1363667&r1=1363666&r2=1363667&view=diff
==============================================================================
--- gora/trunk/pom.xml (original)
+++ gora/trunk/pom.xml Fri Jul 20 07:00:12 2012
@@ -523,7 +523,7 @@
<avro.version>1.3.3</avro.version>
<cxf-rt-frontend-jaxrs.version>2.5.2</cxf-rt-frontend-jaxrs.version>
<!-- Cassandra Dependencies -->
- <cassandra.version>1.1.0</cassandra.version>
+ <cassandra.version>1.1.2</cassandra.version>
<libthrift.version>0.7.0</libthrift.version>
<hector.version>1.1-0</hector.version>
<!-- Misc Dependencies -->