Hi,

I wrote my mapreduce code by Hadoop0.20.1 and HBase 0.20.2 API, which reads
HDFS txt file and write to HBase table.
I test by 2 versions:


Version 1 looks likes the follows:

Maps:
    public static class TableMapper extends Mapper<Object, Text, Text, Text>
{

        public void map(Object key, Text value, Context context)
                throws IOException {

                    ...

                    context.write(keys, values);

                }

         }

    }


Reduce:

    public static class ReduceToTable extends
            TableReducer<Text, Text, NullWritable> {


        public void reduce(Text Key, Iterable<Text> RValues, Context
context)
                throws IOException, InterruptedException {

            Iterator<Text> Values = RValues.iterator();

            ...

            Put put = new Put(Bytes.toBytes(rowKey));
            put.add(colName.getBytes(), ts, contents.toString().getBytes());

            context.write(NullWritable.get(), put);

        }

    }

And the JOB:

    public static void main(String[] args) throws Exception {

        HBaseConfiguration config = new HBaseConfiguration();

        HBaseAdmin admin = new HBaseAdmin(config);

        if (!admin.tableExists(tablename)) {
            System.out.println("create new table: " + tablename);
            admin.createTable(descXY);
            isNewTable = true;
        }


        Configuration c = new Configuration();
        final Date sTime = new Date();
        final SimpleDateFormat df = new SimpleDateFormat("yyyyMMddhhmm");
        final String ssTime = df.format(sTime);
        c.set(TableOutputFormat.OUTPUT_TABLE, tablename);
        Job job = new Job(c, NAME);

        job.setNumReduceTasks(numReduceTasks);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        job.setJarByClass(TrailsIndexIncrCreator.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(ReduceToTable.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(job, new Path(srcFilePath));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

--------------------------------------------------------------

And Version 2 looks likes the follows:

Maps:
    public static class TableMapper extends Mapper<Object, Text, Text, Text>
{

        public void map(Object key, Text value, Context context)
                throws IOException {

                    ...

                    context.write(keys, values);

                }

         }

    }


Reduce:

    public static class ReduceToTable extends
            TableReducer<Text, Text, ImmutableBytesWritable> {


        public void reduce(Text Key, Iterable<Text> RValues, Context
context)
                throws IOException, InterruptedException {

            Iterator<Text> Values = RValues.iterator();

            ...

            Put put = new Put(Bytes.toBytes(rowKey));
            put.add(colName.getBytes(), ts, contents.toString().getBytes());

            context.write(tkey.getBytes(), put);

        }

    }

And the JOB:

    public static void main(String[] args) throws Exception {

        HBaseConfiguration config = new HBaseConfiguration();

        HBaseAdmin admin = new HBaseAdmin(config);

        if (!admin.tableExists(tablename)) {
            System.out.println("create new table: " + tablename);
            admin.createTable(descXY);
            isNewTable = true;
        }


        Configuration c = new Configuration();
        final Date sTime = new Date();
        final SimpleDateFormat df = new SimpleDateFormat("yyyyMMddhhmm");
        final String ssTime = df.format(sTime);
        Job job = new Job(c, NAME);
        job.setNumReduceTasks(numReduceTasks);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        job.setJarByClass(TrailsIndexIncrCreator.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(ReduceToTable.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(job, new Path(srcFilePath));
        TableMapReduceUtil.initTableReducerJob(tablename,
ReduceToTable.class,job);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}
--------------------------------------------------------------------------------------

These 2 versions only different in 2 parts: 1. Version 1 use nullWritable
and version 2 use ImmutableBytesWritable in Reducer class.
2. Version 1 use c.set(TableOutputFormat.OUTPUT_TABLE, tablename) to set job
and Version 2 used TableMapReduceUtil.initTableReducerJob.
Both of them can work completed. However, in my test, Version 1 complete job
within 8 mins but v2 need more than 2.5 hours.
I used the same input data: about 580MB for my test. I was wonder about why
the difference of performace is so huge.
Anybody has ideas?

Thanks a lot!

stchu

Reply via email to