Author: jbellis
Date: Wed Nov 10 21:08:01 2010
New Revision: 1033713
URL: http://svn.apache.org/viewvc?rev=1033713&view=rev
Log:
update pig for ByteBuffers.
patch by tjake; reviewed by Jeremy Hanna for CASSANDRA-1725
Modified:
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java
Modified:
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1033713&r1=1033712&r2=1033713&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
(original)
+++
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Wed Nov 10 21:08:01 2010
@@ -68,18 +68,18 @@ public class CassandraStorage extends Lo
// load the next pair
if (!reader.nextKeyValue())
return null;
- byte[] key = (byte[])reader.getCurrentKey();
- SortedMap<byte[],IColumn> cf =
(SortedMap<byte[],IColumn>)reader.getCurrentValue();
+
+ ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
+ SortedMap<ByteBuffer,IColumn> cf =
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
assert key != null && cf != null;
// and wrap it in a tuple
- Tuple tuple = TupleFactory.getInstance().newTuple(2);
+ Tuple tuple = TupleFactory.getInstance().newTuple(2);
ArrayList<Tuple> columns = new ArrayList<Tuple>();
- tuple.set(0, new DataByteArray(key));
- for (Map.Entry<byte[], IColumn> entry : cf.entrySet())
+ tuple.set(0, new DataByteArray(key.array(),
key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
+ for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
{
- byte[] name = entry.getKey();
- columns.add(columnToTuple(name, 0, name.length,
entry.getValue()));
+ columns.add(columnToTuple(entry.getKey(), entry.getValue()));
}
tuple.set(1, new DefaultDataBag(columns));
@@ -91,25 +91,23 @@ public class CassandraStorage extends Lo
}
}
- private Tuple columnToTuple(byte[] name, int nameOffset, int nameLength,
IColumn col) throws IOException
+ private Tuple columnToTuple(ByteBuffer name, IColumn col) throws
IOException
{
Tuple pair = TupleFactory.getInstance().newTuple(2);
- pair.set(0, new DataByteArray(name, nameOffset, nameLength));
+ pair.set(0, new DataByteArray(name.array(),
name.position()+name.arrayOffset(), name.limit()+name.arrayOffset()));
if (col instanceof Column)
{
// standard
pair.set(1, new DataByteArray(col.value().array(),
col.value().position()+col.value().arrayOffset(),
- col.value().remaining()));
+
col.value().limit()+col.value().arrayOffset()));
return pair;
}
// super
ArrayList<Tuple> subcols = new ArrayList<Tuple>();
for (IColumn subcol : ((SuperColumn)col).getSubColumns())
- subcols.add(columnToTuple(subcol.name().array(),
-
subcol.name().position()+subcol.name().arrayOffset(),
- subcol.name().remaining(), subcol));
+ subcols.add(columnToTuple(subcol.name(), subcol));
pair.set(1, new DefaultDataBag(subcols));
return pair;
Modified: cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java?rev=1033713&r1=1033712&r2=1033713&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java
(original)
+++ cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java Wed
Nov 10 21:08:01 2010
@@ -70,15 +70,15 @@ public class WordCount extends Configure
System.exit(0);
}
- public static class TokenizerMapper extends Mapper<byte[],
SortedMap<byte[], IColumn>, Text, IntWritable>
+ public static class TokenizerMapper extends Mapper<ByteBuffer,
SortedMap<ByteBuffer, IColumn>, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
- private String columnName;
+ private ByteBuffer columnName;
- public void map(byte[] key, SortedMap<byte[], IColumn> columns,
Context context) throws IOException, InterruptedException
+ public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn>
columns, Context context) throws IOException, InterruptedException
{
- IColumn column = columns.get(columnName.getBytes());
+ IColumn column = columns.get(columnName);
if (column == null)
return;
String value = ByteBufferUtil.string(column.value());
@@ -95,7 +95,7 @@ public class WordCount extends Configure
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context
context)
throws IOException, InterruptedException
{
- this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
+ this.columnName =
ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes());
}
}