Updated Branches:
  refs/heads/trunk 087d24a9d -> fba541c0c

Add wide row support to ColumnFamilyInputFormat

Patch by jbellis; reviewed by tjake for CASSANDRA-3264


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fba541c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fba541c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fba541c0

Branch: refs/heads/trunk
Commit: fba541c0c33a2e966c0cb23cb1cdabce9c330d26
Parents: 087d24a
Author: T Jake Luciani <[email protected]>
Authored: Wed Jan 25 15:43:55 2012 -0500
Committer: T Jake Luciani <[email protected]>
Committed: Wed Jan 25 15:43:55 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 examples/hadoop_word_count/src/WordCount.java      |   42 +-
 examples/hadoop_word_count/src/WordCountSetup.java |    2 +-
 interface/cassandra.thrift                         |   11 +-
 .../org/apache/cassandra/thrift/Cassandra.java     | 3273 ++++++++++-----
 .../org/apache/cassandra/thrift/Constants.java     |    2 +-
 .../cassandra/hadoop/ColumnFamilyRecordReader.java |  243 +-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   31 +-
 .../apache/cassandra/thrift/CassandraServer.java   |   55 +
 9 files changed, 2606 insertions(+), 1054 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03d2e69..74ee0e9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -56,6 +56,7 @@
  * Make CFMetaData conversions to/from thrift/native schema inverses
    (CASSANDRA_3559)
  * Add initial code for CQL 3.0-beta (CASSANDRA-3781)
+ * Add wide row support for ColumnFamilyInputFormat (CASSANDRA-3264)
 
 
 1.0.8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/examples/hadoop_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/bin/word_count_counters 
b/examples/hadoop_word_count/bin/word_count_counters
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/examples/hadoop_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCount.java 
b/examples/hadoop_word_count/src/WordCount.java
index e1c70bb..d3cee0e 100644
--- a/examples/hadoop_word_count/src/WordCount.java
+++ b/examples/hadoop_word_count/src/WordCount.java
@@ -25,8 +25,6 @@ import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Charsets.UTF_8;
-
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
 import org.apache.cassandra.hadoop.ConfigHelper;
@@ -81,22 +79,28 @@ public class WordCount extends Configured implements Tool
         protected void setup(org.apache.hadoop.mapreduce.Mapper.Context 
context)
         throws IOException, InterruptedException
         {
-            sourceColumn = 
ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
         }
 
         public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> 
columns, Context context) throws IOException, InterruptedException
         {
-            IColumn column = columns.get(sourceColumn);
-            if (column == null)
-                return;
-            String value = ByteBufferUtil.string(column.value());
-            logger.debug("read " + key + ":" + value + " from " + 
context.getInputSplit());
-
-            StringTokenizer itr = new StringTokenizer(value);
-            while (itr.hasMoreTokens())
+            for (IColumn column : columns.values())
             {
-                word.set(itr.nextToken());
-                context.write(word, one);
+                String name  = ByteBufferUtil.string(column.name());
+                String value = null;
+                
+                if (name.contains("int"))
+                    value = 
String.valueOf(ByteBufferUtil.toInt(column.value()));
+                else
+                    value = ByteBufferUtil.string(column.value());
+                               
+                System.err.println("read " + ByteBufferUtil.string(key) + ":" 
+name + ":" + value + " from " + context.getInputSplit());
+
+                StringTokenizer itr = new StringTokenizer(value);
+                while (itr.hasMoreTokens())
+                {
+                    word.set(itr.nextToken());
+                    context.write(word, one);
+                }
             }
         }
     }
@@ -155,10 +159,12 @@ public class WordCount extends Configured implements Tool
         }
         logger.info("output reducer type: " + outputReducerType);
 
+        // use a smaller page size that doesn't divide the row count evenly to 
exercise the paging logic better
+        ConfigHelper.setRangeBatchSize(getConf(), 99);
+
         for (int i = 0; i < WordCountSetup.TEST_COUNT; i++)
         {
             String columnName = "text" + i;
-            getConf().set(CONF_COLUMN_NAME, columnName);
 
             Job job = new Job(getConf(), "wordcount");
             job.setJarByClass(WordCount.class);
@@ -184,6 +190,7 @@ public class WordCount extends Configured implements Tool
                 job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
 
                 ConfigHelper.setOutputColumnFamily(job.getConfiguration(), 
KEYSPACE, OUTPUT_COLUMN_FAMILY);
+                job.getConfiguration().set(CONF_COLUMN_NAME, "sum");
             }
 
             job.setInputFormatClass(ColumnFamilyInputFormat.class);
@@ -194,12 +201,19 @@ public class WordCount extends Configured implements Tool
             ConfigHelper.setInputColumnFamily(job.getConfiguration(), 
KEYSPACE, COLUMN_FAMILY);
             SlicePredicate predicate = new 
SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName)));
             ConfigHelper.setInputSlicePredicate(job.getConfiguration(), 
predicate);
+
             if (i == 4)
             {
                 IndexExpression expr = new 
IndexExpression(ByteBufferUtil.bytes("int4"), IndexOperator.EQ, 
ByteBufferUtil.bytes(0));
                 ConfigHelper.setInputRange(job.getConfiguration(), 
Arrays.asList(expr));
             }
 
+            if (i == 5)
+            {
+                // this will cause the predicate to be ignored in favor of 
scanning everything as a wide row
+                ConfigHelper.setInputColumnFamily(job.getConfiguration(), 
KEYSPACE, COLUMN_FAMILY, true);
+            }
+
             ConfigHelper.setOutputInitialAddress(job.getConfiguration(), 
"localhost");
             ConfigHelper.setOutputPartitioner(job.getConfiguration(), 
"RandomPartitioner");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/examples/hadoop_word_count/src/WordCountSetup.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_word_count/src/WordCountSetup.java 
b/examples/hadoop_word_count/src/WordCountSetup.java
index 66476aa..e8711b2 100644
--- a/examples/hadoop_word_count/src/WordCountSetup.java
+++ b/examples/hadoop_word_count/src/WordCountSetup.java
@@ -36,7 +36,7 @@ public class WordCountSetup
 {
     private static final Logger logger = 
LoggerFactory.getLogger(WordCountSetup.class);
 
-    public static final int TEST_COUNT = 5;
+    public static final int TEST_COUNT = 6;
 
     public static void main(String[] args) throws Exception
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fba541c0/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 08b776e..211c4c8 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -46,7 +46,7 @@ namespace rb CassandraThrift
 #           for every edit that doesn't result in a change to major/minor.
 #
 # See the Semantic Versioning Specification (SemVer) http://semver.org.
-const string VERSION = "19.26.0"
+const string VERSION = "19.27.0"
 
 
 #
@@ -536,6 +536,15 @@ service Cassandra {
                  throws (1:InvalidRequestException ire, 2:UnavailableException 
ue, 3:TimedOutException te),
 
   /**
+   returns a range of columns, wrapping to the next rows if necessary to 
collect max_results.
+  */
+  list<KeySlice> get_paged_slice(1:required string column_family,
+                                 2:required KeyRange range,
+                                 3:required binary start_column,
+                                 4:required ConsistencyLevel 
consistency_level=ConsistencyLevel.ONE)
+                 throws (1:InvalidRequestException ire, 2:UnavailableException 
ue, 3:TimedOutException te),
+
+  /**
     Returns the subset of columns specified in SlicePredicate for the rows 
matching the IndexClause
     @Deprecated; use get_range_slices instead with range.row_filter specified
     */

Reply via email to