Author: jbellis
Date: Mon May 24 19:45:48 2010
New Revision: 947771
URL: http://svn.apache.org/viewvc?rev=947771&view=rev
Log:
fix SlicePredicate serialization inside Hadoop jobs. patch by jbellis;
reviewed by eevans for CASSANDRA-1049
Added:
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/hadoop/
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/hadoop/ColumnFamilyInputFormatTest.java
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/ivy.xml
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=947771&r1=947770&r2=947771&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon May 24 19:45:48 2010
@@ -26,6 +26,7 @@
* expose PhiConvictThreshold (CASSANDRA-1053)
* make repair of RF==1 a no-op (CASSANDRA-1090)
* improve default JVM GC options (CASSANDRA-1014)
+ * fix SlicePredicate serialization inside Hadoop jobs (CASSANDRA-1049)
0.6.1
Modified: cassandra/branches/cassandra-0.6/ivy.xml
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/ivy.xml?rev=947771&r1=947770&r2=947771&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/ivy.xml (original)
+++ cassandra/branches/cassandra-0.6/ivy.xml Mon May 24 19:45:48 2010
@@ -20,6 +20,9 @@
<info organisation="apache-cassandra" module="cassandra"/>
<dependencies>
+ <!-- for Hadoop unit tests -->
+ <dependency org="commons-logging" name="commons-logging" rev="1.1.1"/>
+
<!-- FIXME: paranamer can be dropped after we're depending on avro
(since it depends on them). -->
<dependency org="com.thoughtworks.paranamer"
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=947771&r1=947770&r2=947771&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Mon May 24 19:45:48 2010
@@ -24,11 +24,12 @@ package org.apache.cassandra.hadoop;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TJSONProtocol;
+import org.apache.thrift.protocol.TBinaryProtocol;
public class ConfigHelper
{
@@ -135,10 +136,10 @@ public class ConfigHelper
{
assert predicate != null;
// this is so awful it's kind of cool!
- TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ TSerializer serializer = new TSerializer(new
TBinaryProtocol.Factory());
try
{
- return serializer.toString(predicate, "UTF-8");
+ return FBUtilities.bytesToHex(serializer.serialize(predicate));
}
catch (TException e)
{
@@ -149,11 +150,11 @@ public class ConfigHelper
private static SlicePredicate predicateFromString(String st)
{
assert st != null;
- TDeserializer deserializer = new TDeserializer(new
TJSONProtocol.Factory());
+ TDeserializer deserializer = new TDeserializer(new
TBinaryProtocol.Factory());
SlicePredicate predicate = new SlicePredicate();
try
{
- deserializer.deserialize(predicate, st, "UTF-8");
+ deserializer.deserialize(predicate, FBUtilities.hexToBytes(st));
}
catch (TException e)
{
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=947771&r1=947770&r2=947771&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
Mon May 24 19:45:48 2010
@@ -22,6 +22,7 @@ import java.io.*;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,7 +39,6 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
@@ -480,4 +480,11 @@ public class FBUtilities
}
return utflen;
}
+
+ public static byte[] toByteArray(long n)
+ {
+ byte[] bytes = new byte[8];
+ ByteBuffer.wrap(bytes).putLong(n);
+ return bytes;
+ }
}
Added:
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/hadoop/ColumnFamilyInputFormatTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/hadoop/ColumnFamilyInputFormatTest.java?rev=947771&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/hadoop/ColumnFamilyInputFormatTest.java
(added)
+++
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/hadoop/ColumnFamilyInputFormatTest.java
Mon May 24 19:45:48 2010
@@ -0,0 +1,33 @@
+package org.apache.cassandra.hadoop;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.hadoop.conf.Configuration;
+
+public class ColumnFamilyInputFormatTest
+{
+ @Test
+ public void testSlicePredicate()
+ {
+ long columnValue = 1271253600000l;
+ byte[] columnBytes = FBUtilities.toByteArray(columnValue);
+
+ List<byte[]> columnNames = new ArrayList<byte[]>();
+ columnNames.add(columnBytes);
+ SlicePredicate originalPredicate = new
SlicePredicate().setColumn_names(columnNames);
+
+ Configuration conf = new Configuration();
+ ConfigHelper.setSlicePredicate(conf, originalPredicate);
+
+ SlicePredicate rtPredicate = ConfigHelper.getSlicePredicate(conf);
+ assert rtPredicate.column_names.size() == 1;
+ assert Arrays.equals(originalPredicate.column_names.get(0),
rtPredicate.column_names.get(0));
+ }
+}