/// This is very close to the example in the javadoc
already(Bytes,BatchUpdate) instead of (text/mapwritable), and i find
it to be the easiest way to get people started/motivated with HBase.


package org.apache.hadoop.hbase.mapred;

import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.BatchUpdate;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SampleUploader extends MapReduceBase
implements Mapper<LongWritable, Text, ImmutableBytesWritable, BatchUpdate>, Tool
 {
  private static final String NAME = "SampleUploader";
  private Configuration conf;

  public JobConf createSubmittableJob(String[] args) {
    JobConf c = new JobConf(getConf(), SampleUploader.class);
    c.setJobName(NAME);
    c.setInputPath(new Path(args[0]));
    c.setMapperClass(this.getClass());
    c.setMapOutputKeyClass(ImmutableBytesWritable.class);
    c.setMapOutputValueClass(BatchUpdate.class);
    c.setReducerClass(TableUploader.class);
    TableReduce.initJob(args[1], TableUploader.class, c);
    return c;
  }

  public void map(LongWritable k, Text v,
    OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter r)
  throws IOException {
    // Lines are space-delimited; first item is row, next the columnname and
  // then the third the cell value.
    String tmp = v.toString();
    if (tmp.length() == 0) {
      return;
    }
    String [] splits = v.toString().split(" ");
    String row = splits[0];
    BatchUpdate mw = new  BatchUpdate(row);

    mw.put( "count:",  Bytes.toBytes(splits[1]));
    r.setStatus("Map emitting " + row + " for record " + k.toString());
    output.collect(new ImmutableBytesWritable(row.getBytes()), mw);
  }

  public static class TableUploader extends TableReduce<ImmutableBytesWritable,
BatchUpdate> {


  @Override
  public void reduce(ImmutableBytesWritable key, Iterator<BatchUpdate> values,
      OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
      @SuppressWarnings("unused") Reporter reporter)
      throws IOException {

    while(values.hasNext()) {
      output.collect(key, values.next());
    }
  }
}


  static int printUsage() {
    System.out.println(NAME + " <input> <table_name>");
    return -1;
  }

  public int run(@SuppressWarnings("unused") String[] args) throws Exception {
    // Make sure there are exactly 2 parameters left.
    if (args.length != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
        args.length + " instead of 2.");
      return printUsage();
    }
    JobClient.runJob(createSubmittableJob(args));
    return 0;
  }
  System.out.println("ERROR: Wrong number of parameters: " +
        args.length + " instead of 2.");
      return printUsage();
    }
    JobClient.runJob(createSubmittableJob(args));
    return 0;
  }

  public Configuration getConf() {
    return this.conf;
  }

  public void setConf(final Configuration c) {
    this.conf = c;
  }

  public static void main(String[] args) throws Exception {
    int errCode = ToolRunner.run(new Configuration(), new SampleUploader(),
      args);
    System.exit(errCode);
  }
}

Reply via email to