Hi,
I wrote my mapreduce code by Hadoop0.20.1 and HBase 0.20.2 API, which reads
HDFS txt file and write to HBase table.
I test by 2 versions:
Version 1 looks likes the follows:
Maps:
public static class TableMapper extends Mapper<Object, Text, Text, Text>
{
public void map(Object key, Text value, Context context)
throws IOException {
...
context.write(keys, values);
}
}
}
Reduce:
public static class ReduceToTable extends
TableReducer<Text, Text, NullWritable> {
public void reduce(Text Key, Iterable<Text> RValues, Context
context)
throws IOException, InterruptedException {
Iterator<Text> Values = RValues.iterator();
...
Put put = new Put(Bytes.toBytes(rowKey));
put.add(colName.getBytes(), ts, contents.toString().getBytes());
context.write(NullWritable.get(), put);
}
}
And the JOB:
public static void main(String[] args) throws Exception {
HBaseConfiguration config = new HBaseConfiguration();
HBaseAdmin admin = new HBaseAdmin(config);
if (!admin.tableExists(tablename)) {
System.out.println("create new table: " + tablename);
admin.createTable(descXY);
isNewTable = true;
}
Configuration c = new Configuration();
final Date sTime = new Date();
final SimpleDateFormat df = new SimpleDateFormat("yyyyMMddhhmm");
final String ssTime = df.format(sTime);
c.set(TableOutputFormat.OUTPUT_TABLE, tablename);
Job job = new Job(c, NAME);
job.setNumReduceTasks(numReduceTasks);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.setJarByClass(TrailsIndexIncrCreator.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(ReduceToTable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(srcFilePath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
--------------------------------------------------------------
And Version 2 looks likes the follows:
Maps:
public static class TableMapper extends Mapper<Object, Text, Text, Text>
{
public void map(Object key, Text value, Context context)
throws IOException {
...
context.write(keys, values);
}
}
}
Reduce:
public static class ReduceToTable extends
TableReducer<Text, Text, ImmutableBytesWritable> {
public void reduce(Text Key, Iterable<Text> RValues, Context
context)
throws IOException, InterruptedException {
Iterator<Text> Values = RValues.iterator();
...
Put put = new Put(Bytes.toBytes(rowKey));
put.add(colName.getBytes(), ts, contents.toString().getBytes());
context.write(tkey.getBytes(), put);
}
}
And the JOB:
public static void main(String[] args) throws Exception {
HBaseConfiguration config = new HBaseConfiguration();
HBaseAdmin admin = new HBaseAdmin(config);
if (!admin.tableExists(tablename)) {
System.out.println("create new table: " + tablename);
admin.createTable(descXY);
isNewTable = true;
}
Configuration c = new Configuration();
final Date sTime = new Date();
final SimpleDateFormat df = new SimpleDateFormat("yyyyMMddhhmm");
final String ssTime = df.format(sTime);
Job job = new Job(c, NAME);
job.setNumReduceTasks(numReduceTasks);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.setJarByClass(TrailsIndexIncrCreator.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(ReduceToTable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(srcFilePath));
TableMapReduceUtil.initTableReducerJob(tablename,
ReduceToTable.class,job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
--------------------------------------------------------------------------------------
These 2 versions only different in 2 parts: 1. Version 1 use nullWritable
and version 2 use ImmutableBytesWritable in Reducer class.
2. Version 1 use c.set(TableOutputFormat.OUTPUT_TABLE, tablename) to set job
and Version 2 used TableMapReduceUtil.initTableReducerJob.
Both of them can work completed. However, in my test, Version 1 complete job
within 8 mins but v2 need more than 2.5 hours.
I used the same input data: about 580MB for my test. I was wonder about why
the difference of performace is so huge.
Anybody has ideas?
Thanks a lot!
stchu