Author: johan
Date: Tue Feb 23 14:57:27 2010
New Revision: 915364

URL: http://svn.apache.org/viewvc?rev=915364&view=rev
Log:
Expose the Hadoop input split size to the user. Patch by johan, review by 
jbellis. CASSANDRA-823

Modified:
    
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java

Modified: 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=915364&r1=915363&r2=915364&view=diff
==============================================================================
--- 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
 (original)
+++ 
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
 Tue Feb 23 14:57:27 2010
@@ -32,6 +32,7 @@
     private static final String KEYSPACE_CONFIG = "cassandra.input.keyspace";
     private static final String COLUMNFAMILY_CONFIG = 
"cassandra.input.columnfamily";
     private static final String PREDICATE_CONFIG = "cassandra.input.predicate";
+    private static final String INPUT_SPLIT_SIZE_CONFIG = 
"cassandra.input.split.size";
 
     private static final Logger logger = 
Logger.getLogger(StorageService.class);
 
@@ -62,6 +63,19 @@
         conf.set(COLUMNFAMILY_CONFIG, columnFamily);
     }
 
+    /**
+     * Set the size of the input split.
+     * This affects the number of maps created, if the number is too small
+     * the overhead of each map will take up the bulk of the job time.
+     *  
+     * @param job Job you are about to run.
+     * @param splitsize Size of the input split
+     */
+    public static void setInputSplitSize(Job job, int splitsize)
+    {
+        job.getConfiguration().setInt(INPUT_SPLIT_SIZE_CONFIG, splitsize);
+    }
+    
     public static void setSlicePredicate(Job job, SlicePredicate predicate)
     {
         Configuration conf = job.getConfiguration();
@@ -122,13 +136,15 @@
         // cannonical ranges and nodes holding replicas
         List<TokenRange> masterRangeNodes = getRangeMap();
 
+        int splitsize = 
context.getConfiguration().getInt(INPUT_SPLIT_SIZE_CONFIG, 16384);
+        
         // cannonical ranges, split into pieces:
         // for each range, pick a live owner and ask it to compute bite-sized 
splits
         // TODO parallelize this thread-per-range
         Map<TokenRange, List<String>> splitRanges = new HashMap<TokenRange, 
List<String>>();
         for (TokenRange range : masterRangeNodes)
         {
-            splitRanges.put(range, getSubSplits(range));
+            splitRanges.put(range, getSubSplits(range, splitsize));
         }
 
         // turn the sub-ranges into InputSplits
@@ -143,7 +159,7 @@
             for ( ; i < tokens.size(); i++)
             {
                 ColumnFamilySplit split = new ColumnFamilySplit(keyspace, 
columnFamily, predicate, tokens.get(i - 1), tokens.get(i), endpoints);
-                logger.info("adding " + split);
+                logger.debug("adding " + split);
                 splits.add(split);
             }
         }
@@ -152,7 +168,7 @@
         return splits;
     }
 
-    private List<String> getSubSplits(TokenRange range) throws IOException
+    private List<String> getSubSplits(TokenRange range, int splitsize) throws 
IOException
     {
         // TODO handle failure of range replicas & retry
         TSocket socket = new TSocket(range.endpoints.get(0),
@@ -170,7 +186,7 @@
         List<String> splits;
         try
         {
-            splits = client.describe_splits(range.start_token, 
range.end_token, 128); // TODO make split size configurable
+            splits = client.describe_splits(range.start_token, 
range.end_token, splitsize);
         }
         catch (TException e)
         {


Reply via email to