Author: stack Date: Thu Sep 17 03:25:35 2009 New Revision: 816038 URL: http://svn.apache.org/viewvc?rev=816038&view=rev Log: HBASE-1684 Backup (Export/Import) contrib tool for 0.20
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Export.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Import.java Modified: hadoop/hbase/trunk/CHANGES.txt hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Modified: hadoop/hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=816038&r1=816037&r2=816038&view=diff ============================================================================== --- hadoop/hbase/trunk/CHANGES.txt (original) +++ hadoop/hbase/trunk/CHANGES.txt Thu Sep 17 03:25:35 2009 @@ -59,6 +59,7 @@ HBASE-1835 Add more delete tests HBASE-1574 Client and server APIs to do batch deletes HBASE-1833 hfile.main fixes + HBASE-1684 Backup (Export/Import) contrib tool for 0.20 OPTIMIZATIONS Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java?rev=816038&r1=816037&r2=816038&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Driver.java Thu Sep 17 03:25:35 2009 @@ -34,6 +34,8 @@ ProgramDriver pgd = new ProgramDriver(); pgd.addClass(RowCounter.NAME, RowCounter.class, "Count rows in HBase table"); + pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS."); + pgd.addClass(Import.NAME, Import.class, "Import data written by Export."); pgd.driver(args); } } Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Export.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Export.java?rev=816038&view=auto ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Export.java (added) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Export.java Thu Sep 17 03:25:35 2009 @@ -0,0 +1,120 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; + +/** + * Export an HBase table. + * Writes content to sequence files up in HDFS. Use {...@link Import} to read it + * back in again. + */ +public class Export { + final static String NAME = "export"; + + /** + * Mapper. + */ + static class Exporter + extends TableMapper<ImmutableBytesWritable, Result> { + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, + * org.apache.hadoop.mapreduce.Mapper.Context) + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + try { + context.write(row, value); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + Path outputDir = new Path(args[1]); + Job job = new Job(conf, NAME + "_" + tableName); + job.setJarByClass(Exporter.class); + // TODO: Allow passing filter and subset of rows/columns. + TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), + Exporter.class, null, null, job); + // No reducers. Just write straight to output files. + job.setNumReduceTasks(0); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Result.class); + FileOutputFormat.setOutputPath(job, outputDir); + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: Export <tablename> <outputdir>"); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + HBaseConfiguration conf = new HBaseConfiguration(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length < 2) { + usage("Wrong number of arguments: " + otherArgs.length); + System.exit(-1); + } + Job job = createSubmittableJob(conf, otherArgs); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } +} \ No newline at end of file Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Import.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=816038&view=auto ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Import.java (added) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/Import.java Thu Sep 17 03:25:35 2009 @@ -0,0 +1,126 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.util.GenericOptionsParser; + +/** + * Import data written by {...@link Export}. + */ +public class Import { + final static String NAME = "import"; + + /** + * Write table content out to files in hdfs. + */ + static class Importer + extends TableMapper<ImmutableBytesWritable, Put> { + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, + * org.apache.hadoop.mapreduce.Mapper.Context) + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + try { + context.write(row, resultToPut(row, value)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private static Put resultToPut(ImmutableBytesWritable key, Result result) + throws IOException { + Put put = new Put(key.get()); + for (KeyValue kv : result.raw()) { + put.add(kv); + } + return put; + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + Path inputDir = new Path(args[1]); + Job job = new Job(conf, NAME + "_" + tableName); + job.setJarByClass(Importer.class); + FileInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapperClass(Importer.class); + // No reducers. Just write straight to table. Call initTableReducerJob + // because it sets up the TableOutputFormat. + TableMapReduceUtil.initTableReducerJob(tableName, null, job); + job.setNumReduceTasks(0); + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: Import <tablename> <inputdir>"); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + HBaseConfiguration conf = new HBaseConfiguration(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length < 2) { + usage("Wrong number of arguments: " + otherArgs.length); + System.exit(-1); + } + Job job = createSubmittableJob(conf, otherArgs); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } +} \ No newline at end of file Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java?rev=816038&r1=816037&r2=816038&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java Thu Sep 17 03:25:35 2009 @@ -22,15 +22,14 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** @@ -49,7 +48,7 @@ extends TableMapper<ImmutableBytesWritable, Result> { /** Counter enumeration to count the actual rows. */ - public static enum Counters { ROWS } + public static enum Counters {ROWS} /** * Maps the data. @@ -72,7 +71,6 @@ } } } - } /** @@ -85,11 +83,12 @@ */ public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException { - Job job = new Job(conf, NAME); + String tableName = args[0]; + Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(RowCounter.class); // Columns are space delimited StringBuilder sb = new StringBuilder(); - final int columnoffset = 2; + final int columnoffset = 1; for (int i = columnoffset; i < args.length; i++) { if (i > columnoffset) { sb.append(" "); @@ -97,20 +96,21 @@ sb.append(args[i]); } Scan scan = new Scan(); - for(String columnName : sb.toString().split(" ")) { - String [] fields = columnName.split(":"); - if(fields.length == 1) { - scan.addFamily(Bytes.toBytes(fields[0])); - } else { - scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1])); + if (sb.length() > 0) { + for (String columnName :sb.toString().split(" ")) { + String [] fields = columnName.split(":"); + if(fields.length == 1) { + scan.addFamily(Bytes.toBytes(fields[0])); + } else { + scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1])); + } } - } + } // Second argument is the table name. - TableMapReduceUtil.initTableMapperJob(args[1], scan, + job.setOutputFormatClass(NullOutputFormat.class); + TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); job.setNumReduceTasks(0); - // first argument is the output directory. - FileOutputFormat.setOutputPath(job, new Path(args[0])); return job; } @@ -123,10 +123,9 @@ public static void main(String[] args) throws Exception { HBaseConfiguration conf = new HBaseConfiguration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); - if (otherArgs.length < 3) { + if (otherArgs.length < 1) { System.err.println("ERROR: Wrong number of parameters: " + args.length); - System.err.println("Usage: " + NAME + - " <outputdir> <tablename> <column1> [<column2>...]"); + System.err.println("Usage: RowCounter <tablename> [<column1> <column2>...]"); System.exit(-1); } Job job = createSubmittableJob(conf, otherArgs); Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=816038&r1=816037&r2=816038&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Thu Sep 17 03:25:35 2009 @@ -248,7 +248,7 @@ if (trr == null) { trr = new TableRecordReader(); } - Scan sc = new Scan(scan); + Scan sc = new Scan(this.scan); sc.setStartRow(tSplit.getStartRow()); sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); @@ -276,9 +276,6 @@ if (table == null) { throw new IOException("No table was provided."); } - if (!scan.hasFamilies()) { - throw new IOException("Expecting at least one column."); - } int realNumSplits = startKeys.length; InputSplit[] splits = new InputSplit[realNumSplits]; int middle = startKeys.length / realNumSplits; @@ -319,7 +316,7 @@ * @return The internal scan instance. */ public Scan getScan() { - if (scan == null) scan = new Scan(); + if (this.scan == null) this.scan = new Scan(); return scan; } Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=816038&r1=816037&r2=816038&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Thu Sep 17 03:25:35 2009 @@ -58,11 +58,11 @@ Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException { job.setInputFormatClass(TableInputFormat.class); - job.setMapOutputValueClass(outputValueClass); - job.setMapOutputKeyClass(outputKeyClass); + if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); + if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); job.setMapperClass(mapper); job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table); - job.getConfiguration().set(TableInputFormat.SCAN, + job.getConfiguration().set(TableInputFormat.SCAN, convertScanToString(scan)); } @@ -125,7 +125,7 @@ Class<? extends TableReducer> reducer, Job job, Class partitioner) throws IOException { job.setOutputFormatClass(TableOutputFormat.class); - job.setReducerClass(reducer); + if (reducer != null) job.setReducerClass(reducer); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class);