Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change 
notification.

The following page has been changed by allenday:
http://wiki.apache.org/hadoop/Hbase/MapReduce

The comment on the change is:
adding the BulkImport example

------------------------------------------------------------------------------
  
  Reading from hbase, the !TableInputFormat asks hbase for the list of regions 
and makes a map-per-region.  Writing, it may make sense to avoid the reduce 
step and write back into hbase from inside your map.  You'd do this when your 
job does not need the sort and collation that MR does inside in its reduce; on 
insert, hbase sorts so no point double-sorting (and shuffling data around your 
MR cluster) unless you need to.  If you do not need the reduce, you might just 
have your map emit counts of records processed just so the framework can emit 
that nice report of records processed when the job is done.  See example code 
below.  If running the reduce step makes sense in  your case, its better to 
have lots of reducers so load is spread across the hbase cluster.
  
+ == Example to bulk import/load a text file into an HTable ==
+ 
+ Here's a sample program from [http://spicylogic.com/allenday/blog Allen Day] 
that takes an HDFS text file path and an HBase table name as inputs, and loads 
the contents of the text file to the table.
+ 
+ {{{
+ package com.spicylogic.hbase;
+ import java.io.IOException;
+ import java.util.Iterator;
+ import java.util.Map;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.HBaseConfiguration;
+ import org.apache.hadoop.hbase.client.HTable;
+ import org.apache.hadoop.hbase.io.BatchUpdate;
+ import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+ import org.apache.hadoop.hbase.mapred.TableReduce;
+ import org.apache.hadoop.io.LongWritable;
+ import org.apache.hadoop.io.MapWritable;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapred.JobClient;
+ import org.apache.hadoop.mapred.JobConf;
+ import org.apache.hadoop.mapred.MapReduceBase;
+ import org.apache.hadoop.mapred.Mapper;
+ import org.apache.hadoop.mapred.OutputCollector;
+ import org.apache.hadoop.mapred.Reducer;
+ import org.apache.hadoop.mapred.Reporter;
+ import org.apache.hadoop.mapred.lib.NullOutputFormat;
+ import org.apache.hadoop.util.Tool;
+ import org.apache.hadoop.util.ToolRunner;
+ 
+ public class BulkImport implements Tool {
+   private static final String NAME = "BulkImport";
+   private Configuration conf;
+ 
+   public static class InnerMap extends MapReduceBase implements 
Mapper<LongWritable, Text, Text, Text> {
+     private HTable table;
+     private HBaseConfiguration HBconf;
+ 
+     public void map(LongWritable key, Text value, OutputCollector<Text, Text> 
output, Reporter reporter) throws IOException {
+       if ( table == null )
+         throw new IOException("table is null");
+       
+       String [] splits = value.toString().split("\t");
+       if ( splits.length != 4 )
+         return;
+ 
+       String rowID     = splits[0];
+       int timestamp    = Integer.parseInt( splits[1] );
+       String colID     = splits[2];
+       String cellValue = splits[3];
+ 
+       reporter.setStatus("Map emitting cell for row='" + rowID + "', 
column='" + colID + "', time='" + timestamp + "'");
+ 
+       BatchUpdate bu = new BatchUpdate( rowID );
+       if ( timestamp > 0 )
+         bu.setTimestamp( timestamp );
+ 
+       bu.put(colID, cellValue.getBytes());      
+       table.commit( bu );      
+     }
+     public void configure(JobConf job) {
+       HBconf = new HBaseConfiguration();
+       try {
+         table = new HTable( HBconf, job.get("input.table") );
+       } catch (IOException e) {
+         // TODO Auto-generated catch block
+         e.printStackTrace();
+       }
+     }
+   }
+   
+   
+   public JobConf createSubmittableJob(String[] args) {
+     JobConf c = new JobConf(getConf(), BulkImport.class);
+     c.setJobName(NAME);
+     c.setInputPath(new Path(args[0]));
+ 
+     c.set("input.table", args[1]);
+     c.setMapperClass(InnerMap.class);
+     c.setNumReduceTasks(0);
+     c.setOutputFormat(NullOutputFormat.class);
+     return c;
+   }
+   
+   static int printUsage() {
+     System.err.println("Usage: " + NAME + " <input> <table_name>");
+     System.err.println("\twhere <input> is a tab-delimited text file with 4 
columns.");
+     System.err.println("\t\tcolumn 1 = row ID");
+     System.err.println("\t\tcolumn 2 = timestamp (use a negative value for 
current time)");
+     System.err.println("\t\tcolumn 3 = column ID");
+     System.err.println("\t\tcolumn 4 = cell value");
+     return -1;
+   } 
+ 
+   public int run(@SuppressWarnings("unused") String[] args) throws Exception {
+     // Make sure there are exactly 3 parameters left.
+     if (args.length != 2) {
+       return printUsage();
+     }
+     JobClient.runJob(createSubmittableJob(args));
+     return 0;
+   }
+ 
+   public Configuration getConf() {
+     return this.conf;
+   } 
+ 
+   public void setConf(final Configuration c) {
+     this.conf = c;
+   }
+ 
+   public static void main(String[] args) throws Exception {
+     int errCode = ToolRunner.run(new Configuration(), new BulkImport(), args);
+     System.exit(errCode);
+   }
+ }
+ }}}
+ 
  == Example to map rows/column families between two HTables ==
  
- Here's some sample code from [http://spicylogic.com/allenday/blog Allen Day] 
that will iterate over all rows in one table for specified column families and 
insert those rows/columns to a second table.
+ Here another sample program from [http://spicylogic.com/allenday/blog Allen 
Day] that will iterate over all rows in one table for specified column families 
and insert those rows/columns to a second table.
  
  {{{
+ package com.spicylogic.hbase;
  import java.io.IOException;
  
  public class BulkCopy extends TableMap<Text, Text> implements Tool {

Reply via email to