Author: jbellis
Date: Mon May 23 14:40:51 2011
New Revision: 1126518
URL: http://svn.apache.org/viewvc?rev=1126518&view=rev
Log:
switch to native Thrift for Hadoop map/reduce
patch by Mck SembWever; reviewed by tjake and jbellis for CASSANDRA-2667
Removed:
cassandra/branches/cassandra-0.8.0/examples/hadoop_streaming_output/
cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/streaming/
Modified:
cassandra/branches/cassandra-0.8.0/CHANGES.txt
cassandra/branches/cassandra-0.8.0/NEWS.txt
cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Modified: cassandra/branches/cassandra-0.8.0/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.0/CHANGES.txt?rev=1126518&r1=1126517&r2=1126518&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8.0/CHANGES.txt Mon May 23 14:40:51 2011
@@ -4,6 +4,7 @@
* update CQL consistency levels (CASSANDRA-2566)
* debian packaging fixes (CASSANDRA-2481, 2647)
* fix UUIDType, IntegerType for direct buffers (CASSANDRA-2682, 2684)
+ * switch to native Thrift for Hadoop map/reduce (CASSANDRA-2667)
0.8.0-rc1
Modified: cassandra/branches/cassandra-0.8.0/NEWS.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.0/NEWS.txt?rev=1126518&r1=1126517&r2=1126518&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8.0/NEWS.txt (original)
+++ cassandra/branches/cassandra-0.8.0/NEWS.txt Mon May 23 14:40:51 2011
@@ -13,8 +13,10 @@ Upgrading
- 0.8 is fully API-compatible with 0.7. You can continue
to use your 0.7 clients.
- Avro record classes used in map/reduce and Hadoop streaming code have
- moved from org.apache.cassandra.avro to org.apache.cassandra.hadoop.avro,
- applications using these classes will need to be updated accordingly.
+ been removed. Map/reduce can be switched to Thrift by changing
+ org.apache.cassandra.avro in import statements to
+ org.apache.cassandra.thrift (no class names change). Streaming support
+ has been removed for the time being.
- The loadbalance command has been removed from nodetool. For similar
behavior, decommission then rebootstrap with empty initial_token.
Modified:
cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=1126518&r1=1126517&r2=1126518&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
(original)
+++
cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
Mon May 23 14:40:51 2011
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.SimpleAuthenticator;
-import org.apache.cassandra.hadoop.avro.Mutation;
import org.apache.cassandra.thrift.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
Modified:
cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1126518&r1=1126517&r2=1126518&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
(original)
+++
cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Mon May 23 14:40:51 2011
@@ -59,8 +59,8 @@ import org.apache.cassandra.utils.ByteBu
* @see OutputFormat
*
*/
-final class ColumnFamilyRecordWriter extends
RecordWriter<ByteBuffer,List<org.apache.cassandra.hadoop.avro.Mutation>>
-implements
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<org.apache.cassandra.hadoop.avro.Mutation>>
+final class ColumnFamilyRecordWriter extends
RecordWriter<ByteBuffer,List<Mutation>>
+implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
{
// The configuration this writer is associated with.
private final Configuration conf;
@@ -122,7 +122,7 @@ implements org.apache.hadoop.mapred.Reco
* @throws IOException
*/
@Override
- public void write(ByteBuffer keybuff,
List<org.apache.cassandra.hadoop.avro.Mutation> value) throws IOException
+ public void write(ByteBuffer keybuff, List<Mutation> value) throws
IOException
{
Range range = ringCache.getRange(keybuff);
@@ -136,77 +136,8 @@ implements org.apache.hadoop.mapred.Reco
clients.put(range, client);
}
- for (org.apache.cassandra.hadoop.avro.Mutation amut : value)
- client.put(new Pair<ByteBuffer,Mutation>(keybuff,
avroToThrift(amut)));
- }
-
- /**
- * Deep copies the given Avro mutation into a new Thrift mutation.
- */
- private Mutation avroToThrift(org.apache.cassandra.hadoop.avro.Mutation
amut)
- {
- Mutation mutation = new Mutation();
- org.apache.cassandra.hadoop.avro.ColumnOrSuperColumn acosc =
amut.column_or_supercolumn;
- if (acosc == null)
- {
- // deletion
- assert amut.deletion != null;
- Deletion deletion = new
Deletion().setTimestamp(amut.deletion.timestamp);
- mutation.setDeletion(deletion);
-
- org.apache.cassandra.hadoop.avro.SlicePredicate apred =
amut.deletion.predicate;
- if (apred == null && amut.deletion.super_column == null)
- {
- // leave Deletion alone to delete entire row
- }
- else if (amut.deletion.super_column != null)
- {
- // super column
-
deletion.setSuper_column(ByteBufferUtil.getArray(amut.deletion.super_column));
- }
- else if (apred.column_names != null)
- {
- // column names
- List<ByteBuffer> names = new
ArrayList<ByteBuffer>(apred.column_names.size());
- for (ByteBuffer name : apred.column_names)
- names.add(name);
- deletion.setPredicate(new
SlicePredicate().setColumn_names(names));
- }
- else
- {
- // range
- deletion.setPredicate(new
SlicePredicate().setSlice_range(avroToThrift(apred.slice_range)));
- }
- }
- else
- {
- // creation
- ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
- mutation.setColumn_or_supercolumn(cosc);
- if (acosc.column != null)
- // standard column
- cosc.setColumn(avroToThrift(acosc.column));
- else
- {
- // super column
- ByteBuffer scolname = acosc.super_column.name;
- List<Column> scolcols = new
ArrayList<Column>(acosc.super_column.columns.size());
- for (org.apache.cassandra.hadoop.avro.Column acol :
acosc.super_column.columns)
- scolcols.add(avroToThrift(acol));
- cosc.setSuper_column(new SuperColumn(scolname, scolcols));
- }
- }
- return mutation;
- }
-
- private SliceRange
avroToThrift(org.apache.cassandra.hadoop.avro.SliceRange asr)
- {
- return new SliceRange(asr.start, asr.finish, asr.reversed, asr.count);
- }
-
- private Column avroToThrift(org.apache.cassandra.hadoop.avro.Column acol)
- {
- return new
Column(acol.name).setValue(acol.value).setTimestamp(acol.timestamp);
+ for (Mutation amut : value)
+ client.put(new Pair<ByteBuffer,Mutation>(keybuff, amut));
}
/**