http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java new file mode 100644 index 0000000..b64271e --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -0,0 +1,793 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static java.lang.String.format; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Splitter; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; + +/** + * Tool to import data from a TSV file. + * + * This tool is rather simplistic - it doesn't do any quoting or + * escaping, but is useful for many data loads. + * + * @see ImportTsv#usage(String) + */ [email protected] +public class ImportTsv extends Configured implements Tool { + + protected static final Log LOG = LogFactory.getLog(ImportTsv.class); + + final static String NAME = "importtsv"; + + public final static String MAPPER_CONF_KEY = "importtsv.mapper.class"; + public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output"; + public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp"; + public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + // TODO: the rest of these configs are used exclusively by TsvImporterMapper. + // Move them out of the tool and let the mapper handle its own validation. + public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run"; + // If true, bad lines are logged to stderr. Default: false. + public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines"; + public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; + public final static String SKIP_EMPTY_COLUMNS = "importtsv.skip.empty.columns"; + public final static String COLUMNS_CONF_KEY = "importtsv.columns"; + public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; + public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator"; + //This config is used to propagate credentials from parent MR jobs which launch + //ImportTSV jobs. SEE IntegrationTestImportTsv. + public final static String CREDENTIALS_LOCATION = "credentials_location"; + final static String DEFAULT_SEPARATOR = "\t"; + final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>"; + final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ","; + final static Class DEFAULT_MAPPER = TsvImporterMapper.class; + public final static String CREATE_TABLE_CONF_KEY = "create.table"; + public final static String NO_STRICT_COL_FAMILY = "no.strict"; + /** + * If table didn't exist and was created in dry-run mode, this flag is + * flipped to delete it when MR ends. + */ + private static boolean DRY_RUN_TABLE_CREATED; + + public static class TsvParser { + /** + * Column families and qualifiers mapped to the TSV columns + */ + private final byte[][] families; + private final byte[][] qualifiers; + + private final byte separatorByte; + + private int rowKeyColumnIndex; + + private int maxColumnCount; + + // Default value must be negative + public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1; + + private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX; + + public static final String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY"; + + public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY"; + + public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY"; + + public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY"; + + public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL"; + + private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX; + + public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1; + + public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1; + + public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1; + + private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + + private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX; + + /** + * @param columnsSpecification the list of columns to parser out, comma separated. + * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC + * @param separatorStr + */ + public TsvParser(String columnsSpecification, String separatorStr) { + // Configure separator + byte[] separator = Bytes.toBytes(separatorStr); + Preconditions.checkArgument(separator.length == 1, + "TsvParser only supports single-byte separators"); + separatorByte = separator[0]; + + // Configure columns + ArrayList<String> columnStrings = Lists.newArrayList( + Splitter.on(',').trimResults().split(columnsSpecification)); + + maxColumnCount = columnStrings.size(); + families = new byte[maxColumnCount][]; + qualifiers = new byte[maxColumnCount][]; + + for (int i = 0; i < columnStrings.size(); i++) { + String str = columnStrings.get(i); + if (ROWKEY_COLUMN_SPEC.equals(str)) { + rowKeyColumnIndex = i; + continue; + } + if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) { + timestampKeyColumnIndex = i; + continue; + } + if (ATTRIBUTES_COLUMN_SPEC.equals(str)) { + attrKeyColumnIndex = i; + continue; + } + if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) { + cellVisibilityColumnIndex = i; + continue; + } + if (CELL_TTL_COLUMN_SPEC.equals(str)) { + cellTTLColumnIndex = i; + continue; + } + String[] parts = str.split(":", 2); + if (parts.length == 1) { + families[i] = str.getBytes(); + qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY; + } else { + families[i] = parts[0].getBytes(); + qualifiers[i] = parts[1].getBytes(); + } + } + } + + public boolean hasTimestamp() { + return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX; + } + + public int getTimestampKeyColumnIndex() { + return timestampKeyColumnIndex; + } + + public boolean hasAttributes() { + return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX; + } + + public boolean hasCellVisibility() { + return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + } + + public boolean hasCellTTL() { + return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + } + + public int getAttributesKeyColumnIndex() { + return attrKeyColumnIndex; + } + + public int getCellVisibilityColumnIndex() { + return cellVisibilityColumnIndex; + } + + public int getCellTTLColumnIndex() { + return cellTTLColumnIndex; + } + + public int getRowKeyColumnIndex() { + return rowKeyColumnIndex; + } + + public byte[] getFamily(int idx) { + return families[idx]; + } + public byte[] getQualifier(int idx) { + return qualifiers[idx]; + } + + public ParsedLine parse(byte[] lineBytes, int length) + throws BadTsvLineException { + // Enumerate separator offsets + ArrayList<Integer> tabOffsets = new ArrayList<>(maxColumnCount); + for (int i = 0; i < length; i++) { + if (lineBytes[i] == separatorByte) { + tabOffsets.add(i); + } + } + if (tabOffsets.isEmpty()) { + throw new BadTsvLineException("No delimiter"); + } + + tabOffsets.add(length); + + if (tabOffsets.size() > maxColumnCount) { + throw new BadTsvLineException("Excessive columns"); + } else if (tabOffsets.size() <= getRowKeyColumnIndex()) { + throw new BadTsvLineException("No row key"); + } else if (hasTimestamp() + && tabOffsets.size() <= getTimestampKeyColumnIndex()) { + throw new BadTsvLineException("No timestamp"); + } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) { + throw new BadTsvLineException("No attributes specified"); + } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) { + throw new BadTsvLineException("No cell visibility specified"); + } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) { + throw new BadTsvLineException("No cell TTL specified"); + } + return new ParsedLine(tabOffsets, lineBytes); + } + + class ParsedLine { + private final ArrayList<Integer> tabOffsets; + private byte[] lineBytes; + + ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) { + this.tabOffsets = tabOffsets; + this.lineBytes = lineBytes; + } + + public int getRowKeyOffset() { + return getColumnOffset(rowKeyColumnIndex); + } + public int getRowKeyLength() { + return getColumnLength(rowKeyColumnIndex); + } + + public long getTimestamp(long ts) throws BadTsvLineException { + // Return ts if HBASE_TS_KEY is not configured in column spec + if (!hasTimestamp()) { + return ts; + } + + String timeStampStr = Bytes.toString(lineBytes, + getColumnOffset(timestampKeyColumnIndex), + getColumnLength(timestampKeyColumnIndex)); + try { + return Long.parseLong(timeStampStr); + } catch (NumberFormatException nfe) { + // treat this record as bad record + throw new BadTsvLineException("Invalid timestamp " + timeStampStr); + } + } + + private String getAttributes() { + if (!hasAttributes()) { + return null; + } else { + return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex), + getColumnLength(attrKeyColumnIndex)); + } + } + + public String[] getIndividualAttributes() { + String attributes = getAttributes(); + if (attributes != null) { + return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR); + } else { + return null; + } + } + + public int getAttributeKeyOffset() { + if (hasAttributes()) { + return getColumnOffset(attrKeyColumnIndex); + } else { + return DEFAULT_ATTRIBUTES_COLUMN_INDEX; + } + } + + public int getAttributeKeyLength() { + if (hasAttributes()) { + return getColumnLength(attrKeyColumnIndex); + } else { + return DEFAULT_ATTRIBUTES_COLUMN_INDEX; + } + } + + public int getCellVisibilityColumnOffset() { + if (hasCellVisibility()) { + return getColumnOffset(cellVisibilityColumnIndex); + } else { + return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + } + } + + public int getCellVisibilityColumnLength() { + if (hasCellVisibility()) { + return getColumnLength(cellVisibilityColumnIndex); + } else { + return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; + } + } + + public String getCellVisibility() { + if (!hasCellVisibility()) { + return null; + } else { + return Bytes.toString(lineBytes, getColumnOffset(cellVisibilityColumnIndex), + getColumnLength(cellVisibilityColumnIndex)); + } + } + + public int getCellTTLColumnOffset() { + if (hasCellTTL()) { + return getColumnOffset(cellTTLColumnIndex); + } else { + return DEFAULT_CELL_TTL_COLUMN_INDEX; + } + } + + public int getCellTTLColumnLength() { + if (hasCellTTL()) { + return getColumnLength(cellTTLColumnIndex); + } else { + return DEFAULT_CELL_TTL_COLUMN_INDEX; + } + } + + public long getCellTTL() { + if (!hasCellTTL()) { + return 0; + } else { + return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex), + getColumnLength(cellTTLColumnIndex)); + } + } + + public int getColumnOffset(int idx) { + if (idx > 0) + return tabOffsets.get(idx - 1) + 1; + else + return 0; + } + public int getColumnLength(int idx) { + return tabOffsets.get(idx) - getColumnOffset(idx); + } + public int getColumnCount() { + return tabOffsets.size(); + } + public byte[] getLineBytes() { + return lineBytes; + } + } + + public static class BadTsvLineException extends Exception { + public BadTsvLineException(String err) { + super(err); + } + private static final long serialVersionUID = 1L; + } + + /** + * Return starting position and length of row key from the specified line bytes. + * @param lineBytes + * @param length + * @return Pair of row key offset and length. + * @throws BadTsvLineException + */ + public Pair<Integer, Integer> parseRowKey(byte[] lineBytes, int length) + throws BadTsvLineException { + int rkColumnIndex = 0; + int startPos = 0, endPos = 0; + for (int i = 0; i <= length; i++) { + if (i == length || lineBytes[i] == separatorByte) { + endPos = i - 1; + if (rkColumnIndex++ == getRowKeyColumnIndex()) { + if ((endPos + 1) == startPos) { + throw new BadTsvLineException("Empty value for ROW KEY."); + } + break; + } else { + startPos = endPos + 2; + } + } + if (i == length) { + throw new BadTsvLineException( + "Row key does not exist as number of columns in the line" + + " are less than row key position."); + } + } + return new Pair<>(startPos, endPos - startPos + 1); + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + protected static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException, ClassNotFoundException { + Job job = null; + boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); + try (Connection connection = ConnectionFactory.createConnection(conf)) { + try (Admin admin = connection.getAdmin()) { + // Support non-XML supported characters + // by re-encoding the passed separator as a Base64 string. + String actualSeparator = conf.get(SEPARATOR_CONF_KEY); + if (actualSeparator != null) { + conf.set(SEPARATOR_CONF_KEY, + Base64.encodeBytes(actualSeparator.getBytes())); + } + + // See if a non-default Mapper was set + String mapperClassName = conf.get(MAPPER_CONF_KEY); + Class mapperClass = mapperClassName != null? Class.forName(mapperClassName): DEFAULT_MAPPER; + + TableName tableName = TableName.valueOf(args[0]); + Path inputDir = new Path(args[1]); + String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString()); + job = Job.getInstance(conf, jobName); + job.setJarByClass(mapperClass); + FileInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(TextInputFormat.class); + job.setMapperClass(mapperClass); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + String[] columns = conf.getStrings(COLUMNS_CONF_KEY); + if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) { + String fileLoc = conf.get(CREDENTIALS_LOCATION); + Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf); + job.getCredentials().addAll(cred); + } + + if (hfileOutPath != null) { + if (!admin.tableExists(tableName)) { + LOG.warn(format("Table '%s' does not exist.", tableName)); + if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) { + // TODO: this is backwards. Instead of depending on the existence of a table, + // create a sane splits file for HFileOutputFormat based on data sampling. + createTable(admin, tableName, columns); + if (isDryRun) { + LOG.warn("Dry run: Table will be deleted at end of dry run."); + synchronized (ImportTsv.class) { + DRY_RUN_TABLE_CREATED = true; + } + } + } else { + String errorMsg = + format("Table '%s' does not exist and '%s' is set to no.", tableName, + CREATE_TABLE_CONF_KEY); + LOG.error(errorMsg); + throw new TableNotFoundException(errorMsg); + } + } + try (Table table = connection.getTable(tableName); + RegionLocator regionLocator = connection.getRegionLocator(tableName)) { + boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false); + // if no.strict is false then check column family + if(!noStrict) { + ArrayList<String> unmatchedFamilies = new ArrayList<>(); + Set<String> cfSet = getColumnFamilies(columns); + TableDescriptor tDesc = table.getDescriptor(); + for (String cf : cfSet) { + if(!tDesc.hasColumnFamily(Bytes.toBytes(cf))) { + unmatchedFamilies.add(cf); + } + } + if(unmatchedFamilies.size() > 0) { + ArrayList<String> familyNames = new ArrayList<>(); + for (ColumnFamilyDescriptor family : table.getDescriptor().getColumnFamilies()) { + familyNames.add(family.getNameAsString()); + } + String msg = + "Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY + + " does not match with any of the table " + tableName + + " column families " + familyNames + ".\n" + + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY + + "=true.\n"; + usage(msg); + System.exit(-1); + } + } + if (mapperClass.equals(TsvImporterTextMapper.class)) { + job.setMapOutputValueClass(Text.class); + job.setReducerClass(TextSortReducer.class); + } else { + job.setMapOutputValueClass(Put.class); + job.setCombinerClass(PutCombiner.class); + job.setReducerClass(PutSortReducer.class); + } + if (!isDryRun) { + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), + regionLocator); + } + } + } else { + if (!admin.tableExists(tableName)) { + String errorMsg = format("Table '%s' does not exist.", tableName); + LOG.error(errorMsg); + throw new TableNotFoundException(errorMsg); + } + if (mapperClass.equals(TsvImporterTextMapper.class)) { + usage(TsvImporterTextMapper.class.toString() + + " should not be used for non bulkloading case. use " + + TsvImporterMapper.class.toString() + + " or custom mapper whose value type is Put."); + System.exit(-1); + } + if (!isDryRun) { + // No reducers. Just write straight to table. Call initTableReducerJob + // to set up the TableOutputFormat. + TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job); + } + job.setNumReduceTasks(0); + } + if (isDryRun) { + job.setOutputFormatClass(NullOutputFormat.class); + job.getConfiguration().setStrings("io.serializations", + job.getConfiguration().get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + } + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), + org.apache.hadoop.hbase.shaded.com.google.common.base.Function.class /* Guava used by TsvParser */); + } + } + return job; + } + + private static void createTable(Admin admin, TableName tableName, String[] columns) + throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName); + Set<String> cfSet = getColumnFamilies(columns); + for (String cf : cfSet) { + HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); + htd.addFamily(hcd); + } + LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", + tableName, cfSet)); + admin.createTable(htd); + } + + private static void deleteTable(Configuration conf, String[] args) { + TableName tableName = TableName.valueOf(args[0]); + try (Connection connection = ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin()) { + try { + admin.disableTable(tableName); + } catch (TableNotEnabledException e) { + LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting it."); + } + admin.deleteTable(tableName); + } catch (IOException e) { + LOG.error(format("***Dry run: Failed to delete table '%s'.***%n%s", tableName, + e.toString())); + return; + } + LOG.info(format("Dry run: Deleted table '%s'.", tableName)); + } + + private static Set<String> getColumnFamilies(String[] columns) { + Set<String> cfSet = new HashSet<>(); + for (String aColumn : columns) { + if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn) + || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn) + || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn) + || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn) + || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn)) + continue; + // we are only concerned with the first one (in case this is a cf:cq) + cfSet.add(aColumn.split(":", 2)[0]); + } + return cfSet; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + String usage = + "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" + + "\n" + + "Imports the given input directory of TSV data into the specified table.\n" + + "\n" + + "The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" + + "option. This option takes the form of comma-separated column names, where each\n" + + "column name is either a simple column family, or a columnfamily:qualifier. The special\n" + + "column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" + + "as the row key for each imported record. You must specify exactly one column\n" + + "to be the row key, and you must specify a column name for every column that exists in the\n" + + "input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC + + " designates that this column should be\n" + + "used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " + + TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional." + "\n" + + "You must specify at most one column as timestamp key for each imported record.\n" + + "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" + + "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" + + "\n" + + "Other special columns that can be specified are " + TsvParser.CELL_TTL_COLUMN_SPEC + + " and " + TsvParser.CELL_VISIBILITY_COLUMN_SPEC + ".\n" + + TsvParser.CELL_TTL_COLUMN_SPEC + " designates that this column will be used " + + "as a Cell's Time To Live (TTL) attribute.\n" + + TsvParser.CELL_VISIBILITY_COLUMN_SPEC + " designates that this column contains the " + + "visibility label expression.\n" + + "\n" + + TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+ + " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+ + " as the seperator. Note that more than one OperationAttributes can be specified.\n"+ + "By default importtsv will load data directly into HBase. To instead generate\n" + + "HFiles of data to prepare for a bulk data load, pass the option:\n" + + " -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" + + " Note: if you do not use this option, then the target table must already exist in HBase\n" + + "\n" + + "Other options that may be specified with -D include:\n" + + " -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into" + + " table. If table does not exist, it is created but deleted in the end.\n" + + " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" + + " -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" + + " -D" + SKIP_EMPTY_COLUMNS + "=false - If true then skip empty columns in bulk import\n" + + " '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" + + " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" + + " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + + DEFAULT_MAPPER.getName() + "\n" + + " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" + + " -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + + " Note: if you set this to 'no', then the target table must already exist in HBase\n" + + " -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. " + + "Default is false\n\n" + + "For performance consider the following options:\n" + + " -Dmapreduce.map.speculative=false\n" + + " -Dmapreduce.reduce.speculative=false"; + + System.err.println(usage); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage("Wrong number of arguments: " + args.length); + return -1; + } + + // When MAPPER_CONF_KEY is null, the user wants to use the provided TsvImporterMapper, so + // perform validation on these additional args. When it's not null, user has provided their + // own mapper, thus these validation are not relevant. + // TODO: validation for TsvImporterMapper, not this tool. Move elsewhere. + if (null == getConf().get(MAPPER_CONF_KEY)) { + // Make sure columns are specified + String[] columns = getConf().getStrings(COLUMNS_CONF_KEY); + if (columns == null) { + usage("No columns specified. Please specify with -D" + + COLUMNS_CONF_KEY+"=..."); + return -1; + } + + // Make sure they specify exactly one column as the row key + int rowkeysFound = 0; + for (String col : columns) { + if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++; + } + if (rowkeysFound != 1) { + usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC); + return -1; + } + + // Make sure we have at most one column as the timestamp key + int tskeysFound = 0; + for (String col : columns) { + if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) + tskeysFound++; + } + if (tskeysFound > 1) { + usage("Must specify at most one column as " + + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); + return -1; + } + + int attrKeysFound = 0; + for (String col : columns) { + if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC)) + attrKeysFound++; + } + if (attrKeysFound > 1) { + usage("Must specify at most one column as " + + TsvParser.ATTRIBUTES_COLUMN_SPEC); + return -1; + } + + // Make sure one or more columns are specified excluding rowkey and + // timestamp key + if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) { + usage("One or more columns in addition to the row key and timestamp(optional) are required"); + return -1; + } + } + + // If timestamp option is not specified, use current system time. + long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis()); + + // Set it back to replace invalid timestamp (non-numeric) with current + // system time + getConf().setLong(TIMESTAMP_CONF_KEY, timstamp); + + synchronized (ImportTsv.class) { + DRY_RUN_TABLE_CREATED = false; + } + Job job = createSubmittableJob(getConf(), args); + boolean success = job.waitForCompletion(true); + boolean delete = false; + synchronized (ImportTsv.class) { + delete = DRY_RUN_TABLE_CREATED; + } + if (delete) { + deleteTable(getConf(), args); + } + return success ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int status = ToolRunner.run(HBaseConfiguration.create(), new ImportTsv(), args); + System.exit(status); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java new file mode 100644 index 0000000..953df62 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLDecoder; +import java.text.MessageFormat; +import java.util.Enumeration; +import java.util.jar.JarFile; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +/** + * Finds the Jar for a class. If the class is in a directory in the + * classpath, it creates a Jar on the fly with the contents of the directory + * and returns the path to that Jar. If a Jar is created, it is created in + * the system temporary directory. + * + * This file was forked from hadoop/common/branches/branch-2@1377176. + */ +public class JarFinder { + + private static void copyToZipStream(File file, ZipEntry entry, + ZipOutputStream zos) throws IOException { + InputStream is = new FileInputStream(file); + try { + zos.putNextEntry(entry); + byte[] arr = new byte[4096]; + int read = is.read(arr); + while (read > -1) { + zos.write(arr, 0, read); + read = is.read(arr); + } + } finally { + try { + is.close(); + } finally { + zos.closeEntry(); + } + } + } + + public static void jarDir(File dir, String relativePath, ZipOutputStream zos) + throws IOException { + Preconditions.checkNotNull(relativePath, "relativePath"); + Preconditions.checkNotNull(zos, "zos"); + + // by JAR spec, if there is a manifest, it must be the first entry in the + // ZIP. + File manifestFile = new File(dir, JarFile.MANIFEST_NAME); + ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME); + if (!manifestFile.exists()) { + zos.putNextEntry(manifestEntry); + new Manifest().write(new BufferedOutputStream(zos)); + zos.closeEntry(); + } else { + copyToZipStream(manifestFile, manifestEntry, zos); + } + zos.closeEntry(); + zipDir(dir, relativePath, zos, true); + zos.close(); + } + + private static void zipDir(File dir, String relativePath, ZipOutputStream zos, + boolean start) throws IOException { + String[] dirList = dir.list(); + if (dirList == null) { + return; + } + for (String aDirList : dirList) { + File f = new File(dir, aDirList); + if (!f.isHidden()) { + if (f.isDirectory()) { + if (!start) { + ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/"); + zos.putNextEntry(dirEntry); + zos.closeEntry(); + } + String filePath = f.getPath(); + File file = new File(filePath); + zipDir(file, relativePath + f.getName() + "/", zos, false); + } + else { + String path = relativePath + f.getName(); + if (!path.equals(JarFile.MANIFEST_NAME)) { + ZipEntry anEntry = new ZipEntry(path); + copyToZipStream(f, anEntry, zos); + } + } + } + } + } + + private static void createJar(File dir, File jarFile) throws IOException { + Preconditions.checkNotNull(dir, "dir"); + Preconditions.checkNotNull(jarFile, "jarFile"); + File jarDir = jarFile.getParentFile(); + if (!jarDir.exists()) { + if (!jarDir.mkdirs()) { + throw new IOException(MessageFormat.format("could not create dir [{0}]", + jarDir)); + } + } + try (FileOutputStream fos = new FileOutputStream(jarFile); + JarOutputStream jos = new JarOutputStream(fos)) { + jarDir(dir, "", jos); + } + } + + /** + * Returns the full path to the Jar containing the class. It always return a + * JAR. + * + * @param klass class. + * + * @return path to the Jar containing the class. + */ + public static String getJar(Class klass) { + Preconditions.checkNotNull(klass, "klass"); + ClassLoader loader = klass.getClassLoader(); + if (loader != null) { + String class_file = klass.getName().replaceAll("\\.", "/") + ".class"; + try { + for (Enumeration itr = loader.getResources(class_file); + itr.hasMoreElements(); ) { + URL url = (URL) itr.nextElement(); + String path = url.getPath(); + if (path.startsWith("file:")) { + path = path.substring("file:".length()); + } + path = URLDecoder.decode(path, "UTF-8"); + if ("jar".equals(url.getProtocol())) { + path = URLDecoder.decode(path, "UTF-8"); + return path.replaceAll("!.*$", ""); + } + else if ("file".equals(url.getProtocol())) { + String klassName = klass.getName(); + klassName = klassName.replace(".", "/") + ".class"; + path = path.substring(0, path.length() - klassName.length()); + File baseDir = new File(path); + File testDir = new File(System.getProperty("test.build.dir", "target/test-dir")); + testDir = testDir.getAbsoluteFile(); + if (!testDir.exists()) { + testDir.mkdirs(); + } + File tempJar = File.createTempFile("hadoop-", "", testDir); + tempJar = new File(tempJar.getAbsolutePath() + ".jar"); + tempJar.deleteOnExit(); + createJar(baseDir, tempJar); + return tempJar.getAbsolutePath(); + } + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java new file mode 100644 index 0000000..241608b --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; + [email protected] +public class KeyValueSerialization implements Serialization<KeyValue> { + @Override + public boolean accept(Class<?> c) { + return KeyValue.class.isAssignableFrom(c); + } + + @Override + public KeyValueDeserializer getDeserializer(Class<KeyValue> t) { + return new KeyValueDeserializer(); + } + + @Override + public KeyValueSerializer getSerializer(Class<KeyValue> c) { + return new KeyValueSerializer(); + } + + public static class KeyValueDeserializer implements Deserializer<KeyValue> { + private DataInputStream dis; + + @Override + public void close() throws IOException { + this.dis.close(); + } + + @Override + public KeyValue deserialize(KeyValue ignore) throws IOException { + // I can't overwrite the passed in KV, not from a proto kv, not just yet. TODO + return KeyValueUtil.create(this.dis); + } + + @Override + public void open(InputStream is) throws IOException { + this.dis = new DataInputStream(is); + } + } + + public static class KeyValueSerializer implements Serializer<KeyValue> { + private DataOutputStream dos; + + @Override + public void close() throws IOException { + this.dos.close(); + } + + @Override + public void open(OutputStream os) throws IOException { + this.dos = new DataOutputStream(os); + } + + @Override + public void serialize(KeyValue kv) throws IOException { + KeyValueUtil.write(kv, this.dos); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java new file mode 100644 index 0000000..997e5a8 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java @@ -0,0 +1,57 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.util.TreeSet; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Reducer; + +/** + * Emits sorted KeyValues. + * Reads in all KeyValues from passed Iterator, sorts them, then emits + * KeyValues in sorted order. If lots of columns per row, it will use lots of + * memory sorting. + * @see HFileOutputFormat2 + */ [email protected] +public class KeyValueSortReducer + extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> { + protected void reduce(ImmutableBytesWritable row, Iterable<KeyValue> kvs, + Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context) + throws java.io.IOException, InterruptedException { + TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR); + for (KeyValue kv: kvs) { + try { + map.add(kv.clone()); + } catch (CloneNotSupportedException e) { + throw new java.io.IOException(e); + } + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (KeyValue kv: map) { + context.write(row, kv); + if (++index % 100 == 0) context.setStatus("Wrote " + index); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java new file mode 100644 index 0000000..9f783f1 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +/** + * Create 3 level tree directory, first level is using table name as parent + * directory and then use family name as child directory, and all related HFiles + * for one family are under child directory + * -tableName1 + * -columnFamilyName1 + * -columnFamilyName2 + * -HFiles + * -tableName2 + * -columnFamilyName1 + * -HFiles + * -columnFamilyName2 + */ [email protected] +@VisibleForTesting +public class MultiTableHFileOutputFormat extends HFileOutputFormat2 { + private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class); + + /** + * Creates a composite key to use as a mapper output key when using + * MultiTableHFileOutputFormat.configureIncrementaLoad to set up bulk ingest job + * + * @param tableName Name of the Table - Eg: TableName.getNameAsString() + * @param suffix Usually represents a rowkey when creating a mapper key or column family + * @return byte[] representation of composite key + */ + public static byte[] createCompositeKey(byte[] tableName, + byte[] suffix) { + return combineTableNameSuffix(tableName, suffix); + } + + /** + * Alternate api which accepts an ImmutableBytesWritable for the suffix + * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[]) + */ + public static byte[] createCompositeKey(byte[] tableName, + ImmutableBytesWritable suffix) { + return combineTableNameSuffix(tableName, suffix.get()); + } + + /** + * Alternate api which accepts a String for the tableName and ImmutableBytesWritable for the + * suffix + * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[]) + */ + public static byte[] createCompositeKey(String tableName, + ImmutableBytesWritable suffix) { + return combineTableNameSuffix(tableName.getBytes(Charset.forName("UTF-8")), suffix.get()); + } + + /** + * Analogous to + * {@link HFileOutputFormat2#configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}, + * this function will configure the requisite number of reducers to write HFiles for multple + * tables simultaneously + * + * @param job See {@link org.apache.hadoop.mapreduce.Job} + * @param multiTableDescriptors Table descriptor and region locator pairs + * @throws IOException + */ + public static void configureIncrementalLoad(Job job, List<TableInfo> + multiTableDescriptors) + throws IOException { + MultiTableHFileOutputFormat.configureIncrementalLoad(job, multiTableDescriptors, + MultiTableHFileOutputFormat.class); + } + + final private static int validateCompositeKey(byte[] keyBytes) { + + int separatorIdx = Bytes.indexOf(keyBytes, tableSeparator); + + // Either the separator was not found or a tablename wasn't present or a key wasn't present + if (separatorIdx == -1) { + throw new IllegalArgumentException("Invalid format for composite key [" + Bytes + .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key"); + } + return separatorIdx; + } + + protected static byte[] getTableName(byte[] keyBytes) { + int separatorIdx = validateCompositeKey(keyBytes); + return Bytes.copy(keyBytes, 0, separatorIdx); + } + + protected static byte[] getSuffix(byte[] keyBytes) { + int separatorIdx = validateCompositeKey(keyBytes); + return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length - separatorIdx - 1); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java new file mode 100644 index 0000000..f8fb6dc --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; + +/** + * Convert HBase tabular data from multiple scanners into a format that + * is consumable by Map/Reduce. + * + * <p> + * Usage example + * </p> + * + * <pre> + * List<Scan> scans = new ArrayList<Scan>(); + * + * Scan scan1 = new Scan(); + * scan1.setStartRow(firstRow1); + * scan1.setStopRow(lastRow1); + * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1); + * scans.add(scan1); + * + * Scan scan2 = new Scan(); + * scan2.setStartRow(firstRow2); + * scan2.setStopRow(lastRow2); + * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2); + * scans.add(scan2); + * + * TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class, + * IntWritable.class, job); + * </pre> + */ [email protected] +public class MultiTableInputFormat extends MultiTableInputFormatBase implements + Configurable { + + /** Job parameter that specifies the scan list. */ + public static final String SCANS = "hbase.mapreduce.scans"; + + /** The configuration. */ + private Configuration conf = null; + + /** + * Returns the current configuration. + * + * @return The current configuration. + * @see org.apache.hadoop.conf.Configurable#getConf() + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Sets the configuration. This is used to set the details for the tables to + * be scanned. + * + * @param configuration The configuration to set. + * @see org.apache.hadoop.conf.Configurable#setConf( + * org.apache.hadoop.conf.Configuration) + */ + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + String[] rawScans = conf.getStrings(SCANS); + if (rawScans.length <= 0) { + throw new IllegalArgumentException("There must be at least 1 scan configuration set to : " + + SCANS); + } + List<Scan> scans = new ArrayList<>(); + + for (int i = 0; i < rawScans.length; i++) { + try { + scans.add(TableMapReduceUtil.convertStringToScan(rawScans[i])); + } catch (IOException e) { + throw new RuntimeException("Failed to convert Scan : " + rawScans[i] + " to string", e); + } + } + this.setScans(scans); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java new file mode 100644 index 0000000..5d541a6 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +/** + * A base for {@link MultiTableInputFormat}s. Receives a list of + * {@link Scan} instances that define the input tables and + * filters etc. Subclasses may use other TableRecordReader implementations. + */ [email protected] +public abstract class MultiTableInputFormatBase extends + InputFormat<ImmutableBytesWritable, Result> { + + private static final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class); + + /** Holds the set of scans used to define the input. */ + private List<Scan> scans; + + /** The reader scanning the table, can be a custom one. */ + private TableRecordReader tableRecordReader = null; + + /** + * Builds a TableRecordReader. If no TableRecordReader was provided, uses the + * default. + * + * @param split The split to work with. + * @param context The current context. + * @return The newly created record reader. + * @throws IOException When creating the reader fails. + * @throws InterruptedException when record reader initialization fails + * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( + * org.apache.hadoop.mapreduce.InputSplit, + * org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public RecordReader<ImmutableBytesWritable, Result> createRecordReader( + InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + TableSplit tSplit = (TableSplit) split; + LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength())); + + if (tSplit.getTable() == null) { + throw new IOException("Cannot create a record reader because of a" + + " previous error. Please look at the previous logs lines from" + + " the task's full log for more details."); + } + final Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); + Table table = connection.getTable(tSplit.getTable()); + + if (this.tableRecordReader == null) { + this.tableRecordReader = new TableRecordReader(); + } + final TableRecordReader trr = this.tableRecordReader; + + try { + Scan sc = tSplit.getScan(); + sc.setStartRow(tSplit.getStartRow()); + sc.setStopRow(tSplit.getEndRow()); + trr.setScan(sc); + trr.setTable(table); + return new RecordReader<ImmutableBytesWritable, Result>() { + + @Override + public void close() throws IOException { + trr.close(); + connection.close(); + } + + @Override + public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { + return trr.getCurrentKey(); + } + + @Override + public Result getCurrentValue() throws IOException, InterruptedException { + return trr.getCurrentValue(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return trr.getProgress(); + } + + @Override + public void initialize(InputSplit inputsplit, TaskAttemptContext context) + throws IOException, InterruptedException { + trr.initialize(inputsplit, context); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return trr.nextKeyValue(); + } + }; + } catch (IOException ioe) { + // If there is an exception make sure that all + // resources are closed and released. + trr.close(); + connection.close(); + throw ioe; + } + } + + /** + * Calculates the splits that will serve as input for the map tasks. The + * number of splits matches the number of regions in a table. + * + * @param context The current job context. + * @return The list of input splits. + * @throws IOException When creating the list of splits fails. + * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) + */ + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException { + if (scans.isEmpty()) { + throw new IOException("No scans were provided."); + } + + Map<TableName, List<Scan>> tableMaps = new HashMap<>(); + for (Scan scan : scans) { + byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); + if (tableNameBytes == null) + throw new IOException("A scan object did not have a table name"); + + TableName tableName = TableName.valueOf(tableNameBytes); + + List<Scan> scanList = tableMaps.get(tableName); + if (scanList == null) { + scanList = new ArrayList<>(); + tableMaps.put(tableName, scanList); + } + scanList.add(scan); + } + + List<InputSplit> splits = new ArrayList<>(); + Iterator iter = tableMaps.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); + TableName tableName = entry.getKey(); + List<Scan> scanList = entry.getValue(); + + try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration()); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( + regionLocator, conn.getAdmin()); + Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); + for (Scan scan : scanList) { + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + throw new IOException("Expecting at least one region for table : " + + tableName.getNameAsString()); + } + int count = 0; + + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + + for (int i = 0; i < keys.getFirst().length; i++) { + if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { + continue; + } + + if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || + Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && + (stopRow.length == 0 || Bytes.compareTo(stopRow, + keys.getFirst()[i]) > 0)) { + byte[] splitStart = startRow.length == 0 || + Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? + keys.getFirst()[i] : startRow; + byte[] splitStop = (stopRow.length == 0 || + Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && + keys.getSecond()[i].length > 0 ? + keys.getSecond()[i] : stopRow; + + HRegionLocation hregionLocation = regionLocator.getRegionLocation( + keys.getFirst()[i], false); + String regionHostname = hregionLocation.getHostname(); + HRegionInfo regionInfo = hregionLocation.getRegionInfo(); + String encodedRegionName = regionInfo.getEncodedName(); + long regionSize = sizeCalculator.getRegionSize( + regionInfo.getRegionName()); + + TableSplit split = new TableSplit(table.getName(), + scan, splitStart, splitStop, regionHostname, + encodedRegionName, regionSize); + + splits.add(split); + + if (LOG.isDebugEnabled()) + LOG.debug("getSplits: split -> " + (count++) + " -> " + split); + } + } + } + } + } + + return splits; + } + + /** + * Test if the given region is to be included in the InputSplit while + * splitting the regions of a table. + * <p> + * This optimization is effective when there is a specific reasoning to + * exclude an entire region from the M-R job, (and hence, not contributing to + * the InputSplit), given the start and end keys of the same. <br> + * Useful when we need to remember the last-processed top record and revisit + * the [last, current) interval for M-R processing, continuously. In addition + * to reducing InputSplits, reduces the load on the region server as well, due + * to the ordering of the keys. <br> + * <br> + * Note: It is possible that <code>endKey.length() == 0 </code> , for the last + * (recent) region. <br> + * Override this method, if you want to bulk exclude regions altogether from + * M-R. By default, no region is excluded( i.e. all regions are included). + * + * @param startKey Start key of the region + * @param endKey End key of the region + * @return true, if this region needs to be included as part of the input + * (default). + */ + protected boolean includeRegionInSplit(final byte[] startKey, + final byte[] endKey) { + return true; + } + + /** + * Allows subclasses to get the list of {@link Scan} objects. + */ + protected List<Scan> getScans() { + return this.scans; + } + + /** + * Allows subclasses to set the list of {@link Scan} objects. + * + * @param scans The list of {@link Scan} used to define the input + */ + protected void setScans(List<Scan> scans) { + this.scans = scans; + } + + /** + * Allows subclasses to set the {@link TableRecordReader}. + * + * @param tableRecordReader A different {@link TableRecordReader} + * implementation. + */ + protected void setTableRecordReader(TableRecordReader tableRecordReader) { + this.tableRecordReader = tableRecordReader; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java new file mode 100644 index 0000000..4cc784f --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java @@ -0,0 +1,176 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * <p> + * Hadoop output format that writes to one or more HBase tables. The key is + * taken to be the table name while the output value <em>must</em> be either a + * {@link Put} or a {@link Delete} instance. All tables must already exist, and + * all Puts and Deletes must reference only valid column families. + * </p> + * + * <p> + * Write-ahead logging (WAL) for Puts can be disabled by setting + * {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}. + * Note that disabling write-ahead logging is only appropriate for jobs where + * loss of data due to region server failure can be tolerated (for example, + * because it is easy to rerun a bulk import). + * </p> + */ [email protected] +public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> { + /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */ + public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal"; + /** Property value to use write-ahead logging */ + public static final boolean WAL_ON = true; + /** Property value to disable write-ahead logging */ + public static final boolean WAL_OFF = false; + /** + * Record writer for outputting to multiple HTables. + */ + protected static class MultiTableRecordWriter extends + RecordWriter<ImmutableBytesWritable, Mutation> { + private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class); + Connection connection; + Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>(); + Configuration conf; + boolean useWriteAheadLogging; + + /** + * @param conf + * HBaseConfiguration to used + * @param useWriteAheadLogging + * whether to use write ahead logging. This can be turned off ( + * <tt>false</tt>) to improve performance when bulk loading data. + */ + public MultiTableRecordWriter(Configuration conf, + boolean useWriteAheadLogging) throws IOException { + LOG.debug("Created new MultiTableRecordReader with WAL " + + (useWriteAheadLogging ? "on" : "off")); + this.conf = conf; + this.useWriteAheadLogging = useWriteAheadLogging; + } + + /** + * @param tableName + * the name of the table, as a string + * @return the named mutator + * @throws IOException + * if there is a problem opening a table + */ + BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException { + if(this.connection == null){ + this.connection = ConnectionFactory.createConnection(conf); + } + if (!mutatorMap.containsKey(tableName)) { + LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing"); + + BufferedMutator mutator = + connection.getBufferedMutator(TableName.valueOf(tableName.get())); + mutatorMap.put(tableName, mutator); + } + return mutatorMap.get(tableName); + } + + @Override + public void close(TaskAttemptContext context) throws IOException { + for (BufferedMutator mutator : mutatorMap.values()) { + mutator.close(); + } + if (connection != null) { + connection.close(); + } + } + + /** + * Writes an action (Put or Delete) to the specified table. + * + * @param tableName + * the table being updated. + * @param action + * the update, either a put or a delete. + * @throws IllegalArgumentException + * if the action is not a put or a delete. + */ + @Override + public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException { + BufferedMutator mutator = getBufferedMutator(tableName); + // The actions are not immutable, so we defensively copy them + if (action instanceof Put) { + Put put = new Put((Put) action); + put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL + : Durability.SKIP_WAL); + mutator.mutate(put); + } else if (action instanceof Delete) { + Delete delete = new Delete((Delete) action); + mutator.mutate(delete); + } else + throw new IllegalArgumentException( + "action must be either Delete or Put"); + } + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, + InterruptedException { + // we can't know ahead of time if it's going to blow up when the user + // passes a table name that doesn't exist, so nothing useful here. + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new TableOutputCommitter(); + } + + @Override + public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + return new MultiTableRecordWriter(HBaseConfiguration.create(conf), + conf.getBoolean(WAL_PROPERTY, WAL_ON)); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..e7538a8 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * MultiTableSnapshotInputFormat generalizes + * {@link TableSnapshotInputFormat} + * allowing a MapReduce job to run over one or more table snapshots, with one or more scans + * configured for each. + * Internally, the input format delegates to + * {@link TableSnapshotInputFormat} + * and thus has the same performance advantages; + * see {@link TableSnapshotInputFormat} for + * more details. + * Usage is similar to TableSnapshotInputFormat, with the following exception: + * initMultiTableSnapshotMapperJob takes in a map + * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding + * scan will be applied; + * the overall dataset for the job is defined by the concatenation of the regions and tables + * included in each snapshot/scan + * pair. + * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob + * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache + * .hadoop.fs.Path)} + * can be used to configure the job. + * <pre>{@code + * Job job = new Job(conf); + * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of( + * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))), + * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))) + * ); + * Path restoreDir = new Path("/tmp/snapshot_restore_dir") + * TableMapReduceUtil.initTableSnapshotMapperJob( + * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class, + * MyMapOutputValueWritable.class, job, true, restoreDir); + * } + * </pre> + * Internally, this input format restores each snapshot into a subdirectory of the given tmp + * directory. Input splits and + * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce + * .TableSnapshotInputFormat} + * (one per region). + * See {@link TableSnapshotInputFormat} for more notes on + * permissioning; the + * same caveats apply here. + * + * @see TableSnapshotInputFormat + * @see org.apache.hadoop.hbase.client.TableSnapshotScanner + */ [email protected] +public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat { + + private final MultiTableSnapshotInputFormatImpl delegate; + + public MultiTableSnapshotInputFormat() { + this.delegate = new MultiTableSnapshotInputFormatImpl(); + } + + @Override + public List<InputSplit> getSplits(JobContext jobContext) + throws IOException, InterruptedException { + List<TableSnapshotInputFormatImpl.InputSplit> splits = + delegate.getSplits(jobContext.getConfiguration()); + List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size()); + + for (TableSnapshotInputFormatImpl.InputSplit split : splits) { + rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split)); + } + + return rtn; + } + + public static void setInput(Configuration configuration, + Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException { + new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir); + } +}
