Updated Branches: refs/heads/trunk 087d24a9d -> fba541c0c
Add wide row support to ColumnFamilyInputFormat Patch by jbellis; reviewed by tjake for CASSANDRA-3264 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fba541c0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fba541c0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fba541c0 Branch: refs/heads/trunk Commit: fba541c0c33a2e966c0cb23cb1cdabce9c330d26 Parents: 087d24a Author: T Jake Luciani <[email protected]> Authored: Wed Jan 25 15:43:55 2012 -0500 Committer: T Jake Luciani <[email protected]> Committed: Wed Jan 25 15:43:55 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + examples/hadoop_word_count/src/WordCount.java | 42 +- examples/hadoop_word_count/src/WordCountSetup.java | 2 +- interface/cassandra.thrift | 11 +- .../org/apache/cassandra/thrift/Cassandra.java | 3273 ++++++++++----- .../org/apache/cassandra/thrift/Constants.java | 2 +- .../cassandra/hadoop/ColumnFamilyRecordReader.java | 243 +- .../org/apache/cassandra/hadoop/ConfigHelper.java | 31 +- .../apache/cassandra/thrift/CassandraServer.java | 55 + 9 files changed, 2606 insertions(+), 1054 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 03d2e69..74ee0e9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -56,6 +56,7 @@ * Make CFMetaData conversions to/from thrift/native schema inverses (CASSANDRA_3559) * Add initial code for CQL 3.0-beta (CASSANDRA-3781) + * Add wide row support for ColumnFamilyInputFormat (CASSANDRA-3264) 1.0.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/examples/hadoop_word_count/bin/word_count_counters ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/bin/word_count_counters b/examples/hadoop_word_count/bin/word_count_counters old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/examples/hadoop_word_count/src/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/src/WordCount.java b/examples/hadoop_word_count/src/WordCount.java index e1c70bb..d3cee0e 100644 --- a/examples/hadoop_word_count/src/WordCount.java +++ b/examples/hadoop_word_count/src/WordCount.java @@ -25,8 +25,6 @@ import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.google.common.base.Charsets.UTF_8; - import org.apache.cassandra.db.IColumn; import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; import org.apache.cassandra.hadoop.ConfigHelper; @@ -81,22 +79,28 @@ public class WordCount extends Configured implements Tool protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { - sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); } public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException { - IColumn column = columns.get(sourceColumn); - if (column == null) - return; - String value = ByteBufferUtil.string(column.value()); - logger.debug("read " + key + ":" + value + " from " + context.getInputSplit()); - - StringTokenizer itr = new StringTokenizer(value); - while (itr.hasMoreTokens()) + for (IColumn column : columns.values()) { - word.set(itr.nextToken()); - context.write(word, one); + String name = ByteBufferUtil.string(column.name()); + String value = null; + + if (name.contains("int")) + value = String.valueOf(ByteBufferUtil.toInt(column.value())); + else + value = ByteBufferUtil.string(column.value()); + + System.err.println("read " + ByteBufferUtil.string(key) + ":" +name + ":" + value + " from " + context.getInputSplit()); + + StringTokenizer itr = new StringTokenizer(value); + while (itr.hasMoreTokens()) + { + word.set(itr.nextToken()); + context.write(word, one); + } } } } @@ -155,10 +159,12 @@ public class WordCount extends Configured implements Tool } logger.info("output reducer type: " + outputReducerType); + // use a smaller page size that doesn't divide the row count evenly to exercise the paging logic better + ConfigHelper.setRangeBatchSize(getConf(), 99); + for (int i = 0; i < WordCountSetup.TEST_COUNT; i++) { String columnName = "text" + i; - getConf().set(CONF_COLUMN_NAME, columnName); Job job = new Job(getConf(), "wordcount"); job.setJarByClass(WordCount.class); @@ -184,6 +190,7 @@ public class WordCount extends Configured implements Tool job.setOutputFormatClass(ColumnFamilyOutputFormat.class); ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); + job.getConfiguration().set(CONF_COLUMN_NAME, "sum"); } job.setInputFormatClass(ColumnFamilyInputFormat.class); @@ -194,12 +201,19 @@ public class WordCount extends Configured implements Tool ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName))); ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); + if (i == 4) { IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("int4"), IndexOperator.EQ, ByteBufferUtil.bytes(0)); ConfigHelper.setInputRange(job.getConfiguration(), Arrays.asList(expr)); } + if (i == 5) + { + // this will cause the predicate to be ignored in favor of scanning everything as a wide row + ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY, true); + } + ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost"); ConfigHelper.setOutputPartitioner(job.getConfiguration(), "RandomPartitioner"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/examples/hadoop_word_count/src/WordCountSetup.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_word_count/src/WordCountSetup.java b/examples/hadoop_word_count/src/WordCountSetup.java index 66476aa..e8711b2 100644 --- a/examples/hadoop_word_count/src/WordCountSetup.java +++ b/examples/hadoop_word_count/src/WordCountSetup.java @@ -36,7 +36,7 @@ public class WordCountSetup { private static final Logger logger = LoggerFactory.getLogger(WordCountSetup.class); - public static final int TEST_COUNT = 5; + public static final int TEST_COUNT = 6; public static void main(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/interface/cassandra.thrift ---------------------------------------------------------------------- diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift index 08b776e..211c4c8 100644 --- a/interface/cassandra.thrift +++ b/interface/cassandra.thrift @@ -46,7 +46,7 @@ namespace rb CassandraThrift # for every edit that doesn't result in a change to major/minor. # # See the Semantic Versioning Specification (SemVer) http://semver.org. -const string VERSION = "19.26.0" +const string VERSION = "19.27.0" # @@ -536,6 +536,15 @@ service Cassandra { throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), /** + returns a range of columns, wrapping to the next rows if necessary to collect max_results. + */ + list<KeySlice> get_paged_slice(1:required string column_family, + 2:required KeyRange range, + 3:required binary start_column, + 4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) + throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), + + /** Returns the subset of columns specified in SlicePredicate for the rows matching the IndexClause @Deprecated; use get_range_slices instead with range.row_filter specified */
