Author: jbellis Date: Mon May 23 14:40:51 2011 New Revision: 1126518 URL: http://svn.apache.org/viewvc?rev=1126518&view=rev Log: switch to native Thrift for Hadoop map/reduce patch by Mck SembWever; reviewed by tjake and jbellis for CASSANDRA-2667
Removed: cassandra/branches/cassandra-0.8.0/examples/hadoop_streaming_output/ cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/streaming/ Modified: cassandra/branches/cassandra-0.8.0/CHANGES.txt cassandra/branches/cassandra-0.8.0/NEWS.txt cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Modified: cassandra/branches/cassandra-0.8.0/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.0/CHANGES.txt?rev=1126518&r1=1126517&r2=1126518&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8.0/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8.0/CHANGES.txt Mon May 23 14:40:51 2011 @@ -4,6 +4,7 @@ * update CQL consistency levels (CASSANDRA-2566) * debian packaging fixes (CASSANDRA-2481, 2647) * fix UUIDType, IntegerType for direct buffers (CASSANDRA-2682, 2684) + * switch to native Thrift for Hadoop map/reduce (CASSANDRA-2667) 0.8.0-rc1 Modified: cassandra/branches/cassandra-0.8.0/NEWS.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.0/NEWS.txt?rev=1126518&r1=1126517&r2=1126518&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8.0/NEWS.txt (original) +++ cassandra/branches/cassandra-0.8.0/NEWS.txt Mon May 23 14:40:51 2011 @@ -13,8 +13,10 @@ Upgrading - 0.8 is fully API-compatible with 0.7. You can continue to use your 0.7 clients. - Avro record classes used in map/reduce and Hadoop streaming code have - moved from org.apache.cassandra.avro to org.apache.cassandra.hadoop.avro, - applications using these classes will need to be updated accordingly. + been removed. Map/reduce can be switched to Thrift by changing + org.apache.cassandra.avro in import statements to + org.apache.cassandra.thrift (no class names change). Streaming support + has been removed for the time being. - The loadbalance command has been removed from nodetool. For similar behavior, decommission then rebootstrap with empty initial_token. Modified: cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=1126518&r1=1126517&r2=1126518&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (original) +++ cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Mon May 23 14:40:51 2011 @@ -31,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.SimpleAuthenticator; -import org.apache.cassandra.hadoop.avro.Mutation; import org.apache.cassandra.thrift.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.*; Modified: cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1126518&r1=1126517&r2=1126518&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original) +++ cassandra/branches/cassandra-0.8.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Mon May 23 14:40:51 2011 @@ -59,8 +59,8 @@ import org.apache.cassandra.utils.ByteBu * @see OutputFormat * */ -final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer,List<org.apache.cassandra.hadoop.avro.Mutation>> -implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<org.apache.cassandra.hadoop.avro.Mutation>> +final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>> +implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> { // The configuration this writer is associated with. private final Configuration conf; @@ -122,7 +122,7 @@ implements org.apache.hadoop.mapred.Reco * @throws IOException */ @Override - public void write(ByteBuffer keybuff, List<org.apache.cassandra.hadoop.avro.Mutation> value) throws IOException + public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException { Range range = ringCache.getRange(keybuff); @@ -136,77 +136,8 @@ implements org.apache.hadoop.mapred.Reco clients.put(range, client); } - for (org.apache.cassandra.hadoop.avro.Mutation amut : value) - client.put(new Pair<ByteBuffer,Mutation>(keybuff, avroToThrift(amut))); - } - - /** - * Deep copies the given Avro mutation into a new Thrift mutation. - */ - private Mutation avroToThrift(org.apache.cassandra.hadoop.avro.Mutation amut) - { - Mutation mutation = new Mutation(); - org.apache.cassandra.hadoop.avro.ColumnOrSuperColumn acosc = amut.column_or_supercolumn; - if (acosc == null) - { - // deletion - assert amut.deletion != null; - Deletion deletion = new Deletion().setTimestamp(amut.deletion.timestamp); - mutation.setDeletion(deletion); - - org.apache.cassandra.hadoop.avro.SlicePredicate apred = amut.deletion.predicate; - if (apred == null && amut.deletion.super_column == null) - { - // leave Deletion alone to delete entire row - } - else if (amut.deletion.super_column != null) - { - // super column - deletion.setSuper_column(ByteBufferUtil.getArray(amut.deletion.super_column)); - } - else if (apred.column_names != null) - { - // column names - List<ByteBuffer> names = new ArrayList<ByteBuffer>(apred.column_names.size()); - for (ByteBuffer name : apred.column_names) - names.add(name); - deletion.setPredicate(new SlicePredicate().setColumn_names(names)); - } - else - { - // range - deletion.setPredicate(new SlicePredicate().setSlice_range(avroToThrift(apred.slice_range))); - } - } - else - { - // creation - ColumnOrSuperColumn cosc = new ColumnOrSuperColumn(); - mutation.setColumn_or_supercolumn(cosc); - if (acosc.column != null) - // standard column - cosc.setColumn(avroToThrift(acosc.column)); - else - { - // super column - ByteBuffer scolname = acosc.super_column.name; - List<Column> scolcols = new ArrayList<Column>(acosc.super_column.columns.size()); - for (org.apache.cassandra.hadoop.avro.Column acol : acosc.super_column.columns) - scolcols.add(avroToThrift(acol)); - cosc.setSuper_column(new SuperColumn(scolname, scolcols)); - } - } - return mutation; - } - - private SliceRange avroToThrift(org.apache.cassandra.hadoop.avro.SliceRange asr) - { - return new SliceRange(asr.start, asr.finish, asr.reversed, asr.count); - } - - private Column avroToThrift(org.apache.cassandra.hadoop.avro.Column acol) - { - return new Column(acol.name).setValue(acol.value).setTimestamp(acol.timestamp); + for (Mutation amut : value) + client.put(new Pair<ByteBuffer,Mutation>(keybuff, amut)); } /**