http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java deleted file mode 100644 index b5bb2ec..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ /dev/null @@ -1,780 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.TreeMap; -import java.util.UUID; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.TaskCounter; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.zookeeper.KeeperException; - - -/** - * Import data written by {@link Export}. - */ [email protected] -public class Import extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(Import.class); - final static String NAME = "import"; - public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS"; - public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output"; - public final static String FILTER_CLASS_CONF_KEY = "import.filter.class"; - public final static String FILTER_ARGS_CONF_KEY = "import.filter.args"; - public final static String TABLE_NAME = "import.table.name"; - public final static String WAL_DURABILITY = "import.wal.durability"; - public final static String HAS_LARGE_RESULT= "import.bulk.hasLargeResult"; - - private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - - public static class KeyValueWritableComparablePartitioner - extends Partitioner<KeyValueWritableComparable, KeyValue> { - private static KeyValueWritableComparable[] START_KEYS = null; - @Override - public int getPartition(KeyValueWritableComparable key, KeyValue value, - int numPartitions) { - for (int i = 0; i < START_KEYS.length; ++i) { - if (key.compareTo(START_KEYS[i]) <= 0) { - return i; - } - } - return START_KEYS.length; - } - - } - - public static class KeyValueWritableComparable - implements WritableComparable<KeyValueWritableComparable> { - - private KeyValue kv = null; - - static { - // register this comparator - WritableComparator.define(KeyValueWritableComparable.class, - new KeyValueWritableComparator()); - } - - public KeyValueWritableComparable() { - } - - public KeyValueWritableComparable(KeyValue kv) { - this.kv = kv; - } - - @Override - public void write(DataOutput out) throws IOException { - KeyValue.write(kv, out); - } - - @Override - public void readFields(DataInput in) throws IOException { - kv = KeyValue.create(in); - } - - @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", - justification="This is wrong, yes, but we should be purging Writables, not fixing them") - public int compareTo(KeyValueWritableComparable o) { - return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv); - } - - public static class KeyValueWritableComparator extends WritableComparator { - - @Override - public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - try { - KeyValueWritableComparable kv1 = new KeyValueWritableComparable(); - kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); - KeyValueWritableComparable kv2 = new KeyValueWritableComparable(); - kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); - return compare(kv1, kv2); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - } - - } - - public static class KeyValueReducer - extends - Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> { - protected void reduce( - KeyValueWritableComparable row, - Iterable<KeyValue> kvs, - Reducer<KeyValueWritableComparable, - KeyValue, ImmutableBytesWritable, KeyValue>.Context context) - throws java.io.IOException, InterruptedException { - int index = 0; - for (KeyValue kv : kvs) { - context.write(new ImmutableBytesWritable(kv.getRowArray()), kv); - if (++index % 100 == 0) - context.setStatus("Wrote " + index + " KeyValues, " - + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); - } - } - } - - public static class KeyValueSortImporter - extends TableMapper<KeyValueWritableComparable, KeyValue> { - private Map<byte[], byte[]> cfRenameMap; - private Filter filter; - private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); - - /** - * @param row The current table row key. - * @param value The columns. - * @param context The current context. - * @throws IOException When something is broken with the data. - */ - @Override - public void map(ImmutableBytesWritable row, Result value, - Context context) - throws IOException { - try { - if (LOG.isTraceEnabled()) { - LOG.trace("Considering the row." - + Bytes.toString(row.get(), row.getOffset(), row.getLength())); - } - if (filter == null - || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(), - (short) row.getLength()))) { - for (Cell kv : value.rawCells()) { - kv = filterKv(filter, kv); - // skip if we filtered it out - if (kv == null) continue; - // TODO get rid of ensureKeyValue - KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)); - context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); - } - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - @Override - public void setup(Context context) throws IOException { - cfRenameMap = createCfRenameMap(context.getConfiguration()); - filter = instantiateFilter(context.getConfiguration()); - int reduceNum = context.getNumReduceTasks(); - Configuration conf = context.getConfiguration(); - TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME)); - try (Connection conn = ConnectionFactory.createConnection(conf); - RegionLocator regionLocator = conn.getRegionLocator(tableName)) { - byte[][] startKeys = regionLocator.getStartKeys(); - if (startKeys.length != reduceNum) { - throw new IOException("Region split after job initialization"); - } - KeyValueWritableComparable[] startKeyWraps = - new KeyValueWritableComparable[startKeys.length - 1]; - for (int i = 1; i < startKeys.length; ++i) { - startKeyWraps[i - 1] = - new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); - } - KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps; - } - } - } - - /** - * A mapper that just writes out KeyValues. - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", - justification="Writables are going away and this has been this way forever") - public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> { - private Map<byte[], byte[]> cfRenameMap; - private Filter filter; - private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); - - /** - * @param row The current table row key. - * @param value The columns. - * @param context The current context. - * @throws IOException When something is broken with the data. - */ - @Override - public void map(ImmutableBytesWritable row, Result value, - Context context) - throws IOException { - try { - if (LOG.isTraceEnabled()) { - LOG.trace("Considering the row." - + Bytes.toString(row.get(), row.getOffset(), row.getLength())); - } - if (filter == null - || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(), - (short) row.getLength()))) { - for (Cell kv : value.rawCells()) { - kv = filterKv(filter, kv); - // skip if we filtered it out - if (kv == null) continue; - // TODO get rid of ensureKeyValue - context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap))); - } - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - @Override - public void setup(Context context) { - cfRenameMap = createCfRenameMap(context.getConfiguration()); - filter = instantiateFilter(context.getConfiguration()); - } - } - - /** - * Write table content out to files in hdfs. - */ - public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> { - private Map<byte[], byte[]> cfRenameMap; - private List<UUID> clusterIds; - private Filter filter; - private Durability durability; - - /** - * @param row The current table row key. - * @param value The columns. - * @param context The current context. - * @throws IOException When something is broken with the data. - */ - @Override - public void map(ImmutableBytesWritable row, Result value, - Context context) - throws IOException { - try { - writeResult(row, value, context); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - private void writeResult(ImmutableBytesWritable key, Result result, Context context) - throws IOException, InterruptedException { - Put put = null; - Delete delete = null; - if (LOG.isTraceEnabled()) { - LOG.trace("Considering the row." - + Bytes.toString(key.get(), key.getOffset(), key.getLength())); - } - if (filter == null - || !filter.filterRowKey(CellUtil.createFirstOnRow(key.get(), key.getOffset(), - (short) key.getLength()))) { - processKV(key, result, context, put, delete); - } - } - - protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put, - Delete delete) throws IOException, InterruptedException { - for (Cell kv : result.rawCells()) { - kv = filterKv(filter, kv); - // skip if we filter it out - if (kv == null) continue; - - kv = convertKv(kv, cfRenameMap); - // Deletes and Puts are gathered and written when finished - /* - * If there are sequence of mutations and tombstones in an Export, and after Import the same - * sequence should be restored as it is. If we combine all Delete tombstones into single - * request then there is chance of ignoring few DeleteFamily tombstones, because if we - * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining - * only newest in hbase table and ignoring other. Check - HBASE-12065 - */ - if (CellUtil.isDeleteFamily(kv)) { - Delete deleteFamily = new Delete(key.get()); - deleteFamily.add(kv); - if (durability != null) { - deleteFamily.setDurability(durability); - } - deleteFamily.setClusterIds(clusterIds); - context.write(key, deleteFamily); - } else if (CellUtil.isDelete(kv)) { - if (delete == null) { - delete = new Delete(key.get()); - } - delete.add(kv); - } else { - if (put == null) { - put = new Put(key.get()); - } - addPutToKv(put, kv); - } - } - if (put != null) { - if (durability != null) { - put.setDurability(durability); - } - put.setClusterIds(clusterIds); - context.write(key, put); - } - if (delete != null) { - if (durability != null) { - delete.setDurability(durability); - } - delete.setClusterIds(clusterIds); - context.write(key, delete); - } - } - - protected void addPutToKv(Put put, Cell kv) throws IOException { - put.add(kv); - } - - @Override - public void setup(Context context) { - LOG.info("Setting up " + getClass() + " mapper."); - Configuration conf = context.getConfiguration(); - cfRenameMap = createCfRenameMap(conf); - filter = instantiateFilter(conf); - String durabilityStr = conf.get(WAL_DURABILITY); - if(durabilityStr != null){ - durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT)); - LOG.info("setting WAL durability to " + durability); - } else { - LOG.info("setting WAL durability to default."); - } - // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid. - ZooKeeperWatcher zkw = null; - Exception ex = null; - try { - zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null); - clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw)); - } catch (ZooKeeperConnectionException e) { - ex = e; - LOG.error("Problem connecting to ZooKeper during task setup", e); - } catch (KeeperException e) { - ex = e; - LOG.error("Problem reading ZooKeeper data during task setup", e); - } catch (IOException e) { - ex = e; - LOG.error("Problem setting up task", e); - } finally { - if (zkw != null) zkw.close(); - } - if (clusterIds == null) { - // exit early if setup fails - throw new RuntimeException(ex); - } - } - } - - /** - * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to - * optionally not include in the job output - * @param conf {@link Configuration} from which to load the filter - * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used - * @throws IllegalArgumentException if the filter is misconfigured - */ - public static Filter instantiateFilter(Configuration conf) { - // get the filter, if it was configured - Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); - if (filterClass == null) { - LOG.debug("No configured filter class, accepting all keyvalues."); - return null; - } - LOG.debug("Attempting to create filter:" + filterClass); - String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY); - ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs); - try { - Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class); - return (Filter) m.invoke(null, quotedArgs); - } catch (IllegalAccessException e) { - LOG.error("Couldn't instantiate filter!", e); - throw new RuntimeException(e); - } catch (SecurityException e) { - LOG.error("Couldn't instantiate filter!", e); - throw new RuntimeException(e); - } catch (NoSuchMethodException e) { - LOG.error("Couldn't instantiate filter!", e); - throw new RuntimeException(e); - } catch (IllegalArgumentException e) { - LOG.error("Couldn't instantiate filter!", e); - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - LOG.error("Couldn't instantiate filter!", e); - throw new RuntimeException(e); - } - } - - private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) { - ArrayList<byte[]> quotedArgs = new ArrayList<>(); - for (String stringArg : stringArgs) { - // all the filters' instantiation methods expected quoted args since they are coming from - // the shell, so add them here, though it shouldn't really be needed :-/ - quotedArgs.add(Bytes.toBytes("'" + stringArg + "'")); - } - return quotedArgs; - } - - /** - * Attempt to filter out the keyvalue - * @param kv {@link KeyValue} on which to apply the filter - * @return <tt>null</tt> if the key should not be written, otherwise returns the original - * {@link KeyValue} - */ - public static Cell filterKv(Filter filter, Cell kv) throws IOException { - // apply the filter and skip this kv if the filter doesn't apply - if (filter != null) { - Filter.ReturnCode code = filter.filterKeyValue(kv); - if (LOG.isTraceEnabled()) { - LOG.trace("Filter returned:" + code + " for the key value:" + kv); - } - // if its not an accept type, then skip this kv - if (!(code.equals(Filter.ReturnCode.INCLUDE) || code - .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) { - return null; - } - } - return kv; - } - - // helper: create a new KeyValue based on CF rename map - private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) { - if(cfRenameMap != null) { - // If there's a rename mapping for this CF, create a new KeyValue - byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv)); - if(newCfName != null) { - kv = new KeyValue(kv.getRowArray(), // row buffer - kv.getRowOffset(), // row offset - kv.getRowLength(), // row length - newCfName, // CF buffer - 0, // CF offset - newCfName.length, // CF length - kv.getQualifierArray(), // qualifier buffer - kv.getQualifierOffset(), // qualifier offset - kv.getQualifierLength(), // qualifier length - kv.getTimestamp(), // timestamp - KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type - kv.getValueArray(), // value buffer - kv.getValueOffset(), // value offset - kv.getValueLength()); // value length - } - } - return kv; - } - - // helper: make a map from sourceCfName to destCfName by parsing a config key - private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) { - Map<byte[], byte[]> cfRenameMap = null; - String allMappingsPropVal = conf.get(CF_RENAME_PROP); - if(allMappingsPropVal != null) { - // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,... - String[] allMappings = allMappingsPropVal.split(","); - for (String mapping: allMappings) { - if(cfRenameMap == null) { - cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); - } - String [] srcAndDest = mapping.split(":"); - if(srcAndDest.length != 2) { - continue; - } - cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes()); - } - } - return cfRenameMap; - } - - /** - * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells - * the mapper how to rename column families. - * - * <p>Alternately, instead of calling this function, you could set the configuration key - * {@link #CF_RENAME_PROP} yourself. The value should look like - * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on - * the mapper behavior. - * - * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be - * set - * @param renameMap a mapping from source CF names to destination CF names - */ - static public void configureCfRenaming(Configuration conf, - Map<String, String> renameMap) { - StringBuilder sb = new StringBuilder(); - for(Map.Entry<String,String> entry: renameMap.entrySet()) { - String sourceCf = entry.getKey(); - String destCf = entry.getValue(); - - if(sourceCf.contains(":") || sourceCf.contains(",") || - destCf.contains(":") || destCf.contains(",")) { - throw new IllegalArgumentException("Illegal character in CF names: " - + sourceCf + ", " + destCf); - } - - if(sb.length() != 0) { - sb.append(","); - } - sb.append(sourceCf + ":" + destCf); - } - conf.set(CF_RENAME_PROP, sb.toString()); - } - - /** - * Add a Filter to be instantiated on import - * @param conf Configuration to update (will be passed to the job) - * @param clazz {@link Filter} subclass to instantiate on the server. - * @param filterArgs List of arguments to pass to the filter on instantiation - */ - public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz, - List<String> filterArgs) throws IOException { - conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName()); - conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()])); - } - - /** - * Sets up the actual job. - * @param conf The current configuration. - * @param args The command line parameters. - * @return The newly created job. - * @throws IOException When setting up the job fails. - */ - public static Job createSubmittableJob(Configuration conf, String[] args) - throws IOException { - TableName tableName = TableName.valueOf(args[0]); - conf.set(TABLE_NAME, tableName.getNameAsString()); - Path inputDir = new Path(args[1]); - Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); - job.setJarByClass(Importer.class); - FileInputFormat.setInputPaths(job, inputDir); - job.setInputFormatClass(SequenceFileInputFormat.class); - String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); - - // make sure we get the filter in the jars - try { - Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); - if (filter != null) { - TableMapReduceUtil.addDependencyJarsForClasses(conf, filter); - } - } catch (Exception e) { - throw new IOException(e); - } - - if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) { - LOG.info("Use Large Result!!"); - try (Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)) { - HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); - job.setMapperClass(KeyValueSortImporter.class); - job.setReducerClass(KeyValueReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(KeyValueWritableComparable.class); - job.setMapOutputValueClass(KeyValue.class); - job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", - KeyValueWritableComparable.KeyValueWritableComparator.class, - RawComparator.class); - Path partitionsPath = - new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())); - FileSystem fs = FileSystem.get(job.getConfiguration()); - fs.deleteOnExit(partitionsPath); - job.setPartitionerClass(KeyValueWritableComparablePartitioner.class); - job.setNumReduceTasks(regionLocator.getStartKeys().length); - TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), - org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); - } - } else if (hfileOutPath != null) { - LOG.info("writing to hfiles for bulk load."); - job.setMapperClass(KeyValueImporter.class); - try (Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)){ - job.setReducerClass(KeyValueSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); - TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), - org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); - } - } else { - LOG.info("writing directly to table from Mapper."); - // No reducers. Just write straight to table. Call initTableReducerJob - // because it sets up the TableOutputFormat. - job.setMapperClass(Importer.class); - TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job); - job.setNumReduceTasks(0); - } - return job; - } - - /* - * @param errorMsg Error message. Can be null. - */ - private static void usage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - } - System.err.println("Usage: Import [options] <tablename> <inputdir>"); - System.err.println("By default Import will load data directly into HBase. To instead generate"); - System.err.println("HFiles of data to prepare for a bulk data load, pass the option:"); - System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); - System.err.println("If there is a large result that includes too much KeyValue " - + "whitch can occur OOME caused by the memery sort in reducer, pass the option:"); - System.err.println(" -D" + HAS_LARGE_RESULT + "=true"); - System.err - .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use"); - System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>"); - System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter"); - System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the " - + CF_RENAME_PROP + " property. Futher, filters will only use the" - + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify " - + " whether the current row needs to be ignored completely for processing and " - + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;" - + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including" - + " the KeyValue."); - System.err.println("To import data exported from HBase 0.94, use"); - System.err.println(" -Dhbase.import.version=0.94"); - System.err.println(" -D " + JOB_NAME_CONF_KEY - + "=jobName - use the specified mapreduce job name for the import"); - System.err.println("For performance consider the following options:\n" - + " -Dmapreduce.map.speculative=false\n" - + " -Dmapreduce.reduce.speculative=false\n" - + " -D" + WAL_DURABILITY + "=<Used while writing data to hbase." - +" Allowed values are the supported durability values" - +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>"); - } - - /** - * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we - * need to flush all the regions of the table as the data is held in memory and is also not - * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the - * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL} - */ - public static void flushRegionsIfNecessary(Configuration conf) throws IOException, - InterruptedException { - String tableName = conf.get(TABLE_NAME); - Admin hAdmin = null; - Connection connection = null; - String durability = conf.get(WAL_DURABILITY); - // Need to flush if the data is written to hbase and skip wal is enabled. - if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null - && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) { - LOG.info("Flushing all data that skipped the WAL."); - try { - connection = ConnectionFactory.createConnection(conf); - hAdmin = connection.getAdmin(); - hAdmin.flush(TableName.valueOf(tableName)); - } finally { - if (hAdmin != null) { - hAdmin.close(); - } - if (connection != null) { - connection.close(); - } - } - } - } - - @Override - public int run(String[] args) throws Exception { - if (args.length < 2) { - usage("Wrong number of arguments: " + args.length); - return -1; - } - String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER); - if (inputVersionString != null) { - getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString); - } - Job job = createSubmittableJob(getConf(), args); - boolean isJobSuccessful = job.waitForCompletion(true); - if(isJobSuccessful){ - // Flush all the regions of the table - flushRegionsIfNecessary(getConf()); - } - long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue(); - long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(); - if (outputRecords < inputRecords) { - System.err.println("Warning, not all records were imported (maybe filtered out)."); - if (outputRecords == 0) { - System.err.println("If the data was exported from HBase 0.94 "+ - "consider using -Dhbase.import.version=0.94."); - } - } - - return (isJobSuccessful ? 0 : 1); - } - - /** - * Main entry point. - * @param args The command line parameters. - * @throws Exception When running the job fails. - */ - public static void main(String[] args) throws Exception { - int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args); - System.exit(errCode); - } - -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java deleted file mode 100644 index b64271e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ /dev/null @@ -1,793 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static java.lang.String.format; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotEnabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Base64; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Splitter; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; - -/** - * Tool to import data from a TSV file. - * - * This tool is rather simplistic - it doesn't do any quoting or - * escaping, but is useful for many data loads. - * - * @see ImportTsv#usage(String) - */ [email protected] -public class ImportTsv extends Configured implements Tool { - - protected static final Log LOG = LogFactory.getLog(ImportTsv.class); - - final static String NAME = "importtsv"; - - public final static String MAPPER_CONF_KEY = "importtsv.mapper.class"; - public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output"; - public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp"; - public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - // TODO: the rest of these configs are used exclusively by TsvImporterMapper. - // Move them out of the tool and let the mapper handle its own validation. - public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run"; - // If true, bad lines are logged to stderr. Default: false. - public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines"; - public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; - public final static String SKIP_EMPTY_COLUMNS = "importtsv.skip.empty.columns"; - public final static String COLUMNS_CONF_KEY = "importtsv.columns"; - public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; - public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator"; - //This config is used to propagate credentials from parent MR jobs which launch - //ImportTSV jobs. SEE IntegrationTestImportTsv. - public final static String CREDENTIALS_LOCATION = "credentials_location"; - final static String DEFAULT_SEPARATOR = "\t"; - final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>"; - final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ","; - final static Class DEFAULT_MAPPER = TsvImporterMapper.class; - public final static String CREATE_TABLE_CONF_KEY = "create.table"; - public final static String NO_STRICT_COL_FAMILY = "no.strict"; - /** - * If table didn't exist and was created in dry-run mode, this flag is - * flipped to delete it when MR ends. - */ - private static boolean DRY_RUN_TABLE_CREATED; - - public static class TsvParser { - /** - * Column families and qualifiers mapped to the TSV columns - */ - private final byte[][] families; - private final byte[][] qualifiers; - - private final byte separatorByte; - - private int rowKeyColumnIndex; - - private int maxColumnCount; - - // Default value must be negative - public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1; - - private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX; - - public static final String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY"; - - public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY"; - - public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY"; - - public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY"; - - public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL"; - - private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX; - - public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1; - - public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1; - - public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1; - - private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; - - private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX; - - /** - * @param columnsSpecification the list of columns to parser out, comma separated. - * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC - * @param separatorStr - */ - public TsvParser(String columnsSpecification, String separatorStr) { - // Configure separator - byte[] separator = Bytes.toBytes(separatorStr); - Preconditions.checkArgument(separator.length == 1, - "TsvParser only supports single-byte separators"); - separatorByte = separator[0]; - - // Configure columns - ArrayList<String> columnStrings = Lists.newArrayList( - Splitter.on(',').trimResults().split(columnsSpecification)); - - maxColumnCount = columnStrings.size(); - families = new byte[maxColumnCount][]; - qualifiers = new byte[maxColumnCount][]; - - for (int i = 0; i < columnStrings.size(); i++) { - String str = columnStrings.get(i); - if (ROWKEY_COLUMN_SPEC.equals(str)) { - rowKeyColumnIndex = i; - continue; - } - if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) { - timestampKeyColumnIndex = i; - continue; - } - if (ATTRIBUTES_COLUMN_SPEC.equals(str)) { - attrKeyColumnIndex = i; - continue; - } - if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) { - cellVisibilityColumnIndex = i; - continue; - } - if (CELL_TTL_COLUMN_SPEC.equals(str)) { - cellTTLColumnIndex = i; - continue; - } - String[] parts = str.split(":", 2); - if (parts.length == 1) { - families[i] = str.getBytes(); - qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY; - } else { - families[i] = parts[0].getBytes(); - qualifiers[i] = parts[1].getBytes(); - } - } - } - - public boolean hasTimestamp() { - return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX; - } - - public int getTimestampKeyColumnIndex() { - return timestampKeyColumnIndex; - } - - public boolean hasAttributes() { - return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX; - } - - public boolean hasCellVisibility() { - return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; - } - - public boolean hasCellTTL() { - return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; - } - - public int getAttributesKeyColumnIndex() { - return attrKeyColumnIndex; - } - - public int getCellVisibilityColumnIndex() { - return cellVisibilityColumnIndex; - } - - public int getCellTTLColumnIndex() { - return cellTTLColumnIndex; - } - - public int getRowKeyColumnIndex() { - return rowKeyColumnIndex; - } - - public byte[] getFamily(int idx) { - return families[idx]; - } - public byte[] getQualifier(int idx) { - return qualifiers[idx]; - } - - public ParsedLine parse(byte[] lineBytes, int length) - throws BadTsvLineException { - // Enumerate separator offsets - ArrayList<Integer> tabOffsets = new ArrayList<>(maxColumnCount); - for (int i = 0; i < length; i++) { - if (lineBytes[i] == separatorByte) { - tabOffsets.add(i); - } - } - if (tabOffsets.isEmpty()) { - throw new BadTsvLineException("No delimiter"); - } - - tabOffsets.add(length); - - if (tabOffsets.size() > maxColumnCount) { - throw new BadTsvLineException("Excessive columns"); - } else if (tabOffsets.size() <= getRowKeyColumnIndex()) { - throw new BadTsvLineException("No row key"); - } else if (hasTimestamp() - && tabOffsets.size() <= getTimestampKeyColumnIndex()) { - throw new BadTsvLineException("No timestamp"); - } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) { - throw new BadTsvLineException("No attributes specified"); - } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) { - throw new BadTsvLineException("No cell visibility specified"); - } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) { - throw new BadTsvLineException("No cell TTL specified"); - } - return new ParsedLine(tabOffsets, lineBytes); - } - - class ParsedLine { - private final ArrayList<Integer> tabOffsets; - private byte[] lineBytes; - - ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) { - this.tabOffsets = tabOffsets; - this.lineBytes = lineBytes; - } - - public int getRowKeyOffset() { - return getColumnOffset(rowKeyColumnIndex); - } - public int getRowKeyLength() { - return getColumnLength(rowKeyColumnIndex); - } - - public long getTimestamp(long ts) throws BadTsvLineException { - // Return ts if HBASE_TS_KEY is not configured in column spec - if (!hasTimestamp()) { - return ts; - } - - String timeStampStr = Bytes.toString(lineBytes, - getColumnOffset(timestampKeyColumnIndex), - getColumnLength(timestampKeyColumnIndex)); - try { - return Long.parseLong(timeStampStr); - } catch (NumberFormatException nfe) { - // treat this record as bad record - throw new BadTsvLineException("Invalid timestamp " + timeStampStr); - } - } - - private String getAttributes() { - if (!hasAttributes()) { - return null; - } else { - return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex), - getColumnLength(attrKeyColumnIndex)); - } - } - - public String[] getIndividualAttributes() { - String attributes = getAttributes(); - if (attributes != null) { - return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR); - } else { - return null; - } - } - - public int getAttributeKeyOffset() { - if (hasAttributes()) { - return getColumnOffset(attrKeyColumnIndex); - } else { - return DEFAULT_ATTRIBUTES_COLUMN_INDEX; - } - } - - public int getAttributeKeyLength() { - if (hasAttributes()) { - return getColumnLength(attrKeyColumnIndex); - } else { - return DEFAULT_ATTRIBUTES_COLUMN_INDEX; - } - } - - public int getCellVisibilityColumnOffset() { - if (hasCellVisibility()) { - return getColumnOffset(cellVisibilityColumnIndex); - } else { - return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; - } - } - - public int getCellVisibilityColumnLength() { - if (hasCellVisibility()) { - return getColumnLength(cellVisibilityColumnIndex); - } else { - return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX; - } - } - - public String getCellVisibility() { - if (!hasCellVisibility()) { - return null; - } else { - return Bytes.toString(lineBytes, getColumnOffset(cellVisibilityColumnIndex), - getColumnLength(cellVisibilityColumnIndex)); - } - } - - public int getCellTTLColumnOffset() { - if (hasCellTTL()) { - return getColumnOffset(cellTTLColumnIndex); - } else { - return DEFAULT_CELL_TTL_COLUMN_INDEX; - } - } - - public int getCellTTLColumnLength() { - if (hasCellTTL()) { - return getColumnLength(cellTTLColumnIndex); - } else { - return DEFAULT_CELL_TTL_COLUMN_INDEX; - } - } - - public long getCellTTL() { - if (!hasCellTTL()) { - return 0; - } else { - return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex), - getColumnLength(cellTTLColumnIndex)); - } - } - - public int getColumnOffset(int idx) { - if (idx > 0) - return tabOffsets.get(idx - 1) + 1; - else - return 0; - } - public int getColumnLength(int idx) { - return tabOffsets.get(idx) - getColumnOffset(idx); - } - public int getColumnCount() { - return tabOffsets.size(); - } - public byte[] getLineBytes() { - return lineBytes; - } - } - - public static class BadTsvLineException extends Exception { - public BadTsvLineException(String err) { - super(err); - } - private static final long serialVersionUID = 1L; - } - - /** - * Return starting position and length of row key from the specified line bytes. - * @param lineBytes - * @param length - * @return Pair of row key offset and length. - * @throws BadTsvLineException - */ - public Pair<Integer, Integer> parseRowKey(byte[] lineBytes, int length) - throws BadTsvLineException { - int rkColumnIndex = 0; - int startPos = 0, endPos = 0; - for (int i = 0; i <= length; i++) { - if (i == length || lineBytes[i] == separatorByte) { - endPos = i - 1; - if (rkColumnIndex++ == getRowKeyColumnIndex()) { - if ((endPos + 1) == startPos) { - throw new BadTsvLineException("Empty value for ROW KEY."); - } - break; - } else { - startPos = endPos + 2; - } - } - if (i == length) { - throw new BadTsvLineException( - "Row key does not exist as number of columns in the line" - + " are less than row key position."); - } - } - return new Pair<>(startPos, endPos - startPos + 1); - } - } - - /** - * Sets up the actual job. - * - * @param conf The current configuration. - * @param args The command line parameters. - * @return The newly created job. - * @throws IOException When setting up the job fails. - */ - protected static Job createSubmittableJob(Configuration conf, String[] args) - throws IOException, ClassNotFoundException { - Job job = null; - boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); - try (Connection connection = ConnectionFactory.createConnection(conf)) { - try (Admin admin = connection.getAdmin()) { - // Support non-XML supported characters - // by re-encoding the passed separator as a Base64 string. - String actualSeparator = conf.get(SEPARATOR_CONF_KEY); - if (actualSeparator != null) { - conf.set(SEPARATOR_CONF_KEY, - Base64.encodeBytes(actualSeparator.getBytes())); - } - - // See if a non-default Mapper was set - String mapperClassName = conf.get(MAPPER_CONF_KEY); - Class mapperClass = mapperClassName != null? Class.forName(mapperClassName): DEFAULT_MAPPER; - - TableName tableName = TableName.valueOf(args[0]); - Path inputDir = new Path(args[1]); - String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString()); - job = Job.getInstance(conf, jobName); - job.setJarByClass(mapperClass); - FileInputFormat.setInputPaths(job, inputDir); - job.setInputFormatClass(TextInputFormat.class); - job.setMapperClass(mapperClass); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); - String[] columns = conf.getStrings(COLUMNS_CONF_KEY); - if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) { - String fileLoc = conf.get(CREDENTIALS_LOCATION); - Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf); - job.getCredentials().addAll(cred); - } - - if (hfileOutPath != null) { - if (!admin.tableExists(tableName)) { - LOG.warn(format("Table '%s' does not exist.", tableName)); - if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) { - // TODO: this is backwards. Instead of depending on the existence of a table, - // create a sane splits file for HFileOutputFormat based on data sampling. - createTable(admin, tableName, columns); - if (isDryRun) { - LOG.warn("Dry run: Table will be deleted at end of dry run."); - synchronized (ImportTsv.class) { - DRY_RUN_TABLE_CREATED = true; - } - } - } else { - String errorMsg = - format("Table '%s' does not exist and '%s' is set to no.", tableName, - CREATE_TABLE_CONF_KEY); - LOG.error(errorMsg); - throw new TableNotFoundException(errorMsg); - } - } - try (Table table = connection.getTable(tableName); - RegionLocator regionLocator = connection.getRegionLocator(tableName)) { - boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false); - // if no.strict is false then check column family - if(!noStrict) { - ArrayList<String> unmatchedFamilies = new ArrayList<>(); - Set<String> cfSet = getColumnFamilies(columns); - TableDescriptor tDesc = table.getDescriptor(); - for (String cf : cfSet) { - if(!tDesc.hasColumnFamily(Bytes.toBytes(cf))) { - unmatchedFamilies.add(cf); - } - } - if(unmatchedFamilies.size() > 0) { - ArrayList<String> familyNames = new ArrayList<>(); - for (ColumnFamilyDescriptor family : table.getDescriptor().getColumnFamilies()) { - familyNames.add(family.getNameAsString()); - } - String msg = - "Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY - + " does not match with any of the table " + tableName - + " column families " + familyNames + ".\n" - + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY - + "=true.\n"; - usage(msg); - System.exit(-1); - } - } - if (mapperClass.equals(TsvImporterTextMapper.class)) { - job.setMapOutputValueClass(Text.class); - job.setReducerClass(TextSortReducer.class); - } else { - job.setMapOutputValueClass(Put.class); - job.setCombinerClass(PutCombiner.class); - job.setReducerClass(PutSortReducer.class); - } - if (!isDryRun) { - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), - regionLocator); - } - } - } else { - if (!admin.tableExists(tableName)) { - String errorMsg = format("Table '%s' does not exist.", tableName); - LOG.error(errorMsg); - throw new TableNotFoundException(errorMsg); - } - if (mapperClass.equals(TsvImporterTextMapper.class)) { - usage(TsvImporterTextMapper.class.toString() - + " should not be used for non bulkloading case. use " - + TsvImporterMapper.class.toString() - + " or custom mapper whose value type is Put."); - System.exit(-1); - } - if (!isDryRun) { - // No reducers. Just write straight to table. Call initTableReducerJob - // to set up the TableOutputFormat. - TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job); - } - job.setNumReduceTasks(0); - } - if (isDryRun) { - job.setOutputFormatClass(NullOutputFormat.class); - job.getConfiguration().setStrings("io.serializations", - job.getConfiguration().get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); - } - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), - org.apache.hadoop.hbase.shaded.com.google.common.base.Function.class /* Guava used by TsvParser */); - } - } - return job; - } - - private static void createTable(Admin admin, TableName tableName, String[] columns) - throws IOException { - HTableDescriptor htd = new HTableDescriptor(tableName); - Set<String> cfSet = getColumnFamilies(columns); - for (String cf : cfSet) { - HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); - htd.addFamily(hcd); - } - LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", - tableName, cfSet)); - admin.createTable(htd); - } - - private static void deleteTable(Configuration conf, String[] args) { - TableName tableName = TableName.valueOf(args[0]); - try (Connection connection = ConnectionFactory.createConnection(conf); - Admin admin = connection.getAdmin()) { - try { - admin.disableTable(tableName); - } catch (TableNotEnabledException e) { - LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting it."); - } - admin.deleteTable(tableName); - } catch (IOException e) { - LOG.error(format("***Dry run: Failed to delete table '%s'.***%n%s", tableName, - e.toString())); - return; - } - LOG.info(format("Dry run: Deleted table '%s'.", tableName)); - } - - private static Set<String> getColumnFamilies(String[] columns) { - Set<String> cfSet = new HashSet<>(); - for (String aColumn : columns) { - if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn) - || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn) - || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn) - || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn) - || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn)) - continue; - // we are only concerned with the first one (in case this is a cf:cq) - cfSet.add(aColumn.split(":", 2)[0]); - } - return cfSet; - } - - /* - * @param errorMsg Error message. Can be null. - */ - private static void usage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - } - String usage = - "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" + - "\n" + - "Imports the given input directory of TSV data into the specified table.\n" + - "\n" + - "The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" + - "option. This option takes the form of comma-separated column names, where each\n" + - "column name is either a simple column family, or a columnfamily:qualifier. The special\n" + - "column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" + - "as the row key for each imported record. You must specify exactly one column\n" + - "to be the row key, and you must specify a column name for every column that exists in the\n" + - "input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC + - " designates that this column should be\n" + - "used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " + - TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional." + "\n" + - "You must specify at most one column as timestamp key for each imported record.\n" + - "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" + - "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" + - "\n" + - "Other special columns that can be specified are " + TsvParser.CELL_TTL_COLUMN_SPEC + - " and " + TsvParser.CELL_VISIBILITY_COLUMN_SPEC + ".\n" + - TsvParser.CELL_TTL_COLUMN_SPEC + " designates that this column will be used " + - "as a Cell's Time To Live (TTL) attribute.\n" + - TsvParser.CELL_VISIBILITY_COLUMN_SPEC + " designates that this column contains the " + - "visibility label expression.\n" + - "\n" + - TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+ - " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+ - " as the seperator. Note that more than one OperationAttributes can be specified.\n"+ - "By default importtsv will load data directly into HBase. To instead generate\n" + - "HFiles of data to prepare for a bulk data load, pass the option:\n" + - " -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" + - " Note: if you do not use this option, then the target table must already exist in HBase\n" + - "\n" + - "Other options that may be specified with -D include:\n" + - " -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into" + - " table. If table does not exist, it is created but deleted in the end.\n" + - " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" + - " -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" + - " -D" + SKIP_EMPTY_COLUMNS + "=false - If true then skip empty columns in bulk import\n" + - " '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" + - " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" + - " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + - DEFAULT_MAPPER.getName() + "\n" + - " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" + - " -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + - " Note: if you set this to 'no', then the target table must already exist in HBase\n" + - " -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. " + - "Default is false\n\n" + - "For performance consider the following options:\n" + - " -Dmapreduce.map.speculative=false\n" + - " -Dmapreduce.reduce.speculative=false"; - - System.err.println(usage); - } - - @Override - public int run(String[] args) throws Exception { - if (args.length < 2) { - usage("Wrong number of arguments: " + args.length); - return -1; - } - - // When MAPPER_CONF_KEY is null, the user wants to use the provided TsvImporterMapper, so - // perform validation on these additional args. When it's not null, user has provided their - // own mapper, thus these validation are not relevant. - // TODO: validation for TsvImporterMapper, not this tool. Move elsewhere. - if (null == getConf().get(MAPPER_CONF_KEY)) { - // Make sure columns are specified - String[] columns = getConf().getStrings(COLUMNS_CONF_KEY); - if (columns == null) { - usage("No columns specified. Please specify with -D" + - COLUMNS_CONF_KEY+"=..."); - return -1; - } - - // Make sure they specify exactly one column as the row key - int rowkeysFound = 0; - for (String col : columns) { - if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++; - } - if (rowkeysFound != 1) { - usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC); - return -1; - } - - // Make sure we have at most one column as the timestamp key - int tskeysFound = 0; - for (String col : columns) { - if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) - tskeysFound++; - } - if (tskeysFound > 1) { - usage("Must specify at most one column as " - + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); - return -1; - } - - int attrKeysFound = 0; - for (String col : columns) { - if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC)) - attrKeysFound++; - } - if (attrKeysFound > 1) { - usage("Must specify at most one column as " - + TsvParser.ATTRIBUTES_COLUMN_SPEC); - return -1; - } - - // Make sure one or more columns are specified excluding rowkey and - // timestamp key - if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) { - usage("One or more columns in addition to the row key and timestamp(optional) are required"); - return -1; - } - } - - // If timestamp option is not specified, use current system time. - long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis()); - - // Set it back to replace invalid timestamp (non-numeric) with current - // system time - getConf().setLong(TIMESTAMP_CONF_KEY, timstamp); - - synchronized (ImportTsv.class) { - DRY_RUN_TABLE_CREATED = false; - } - Job job = createSubmittableJob(getConf(), args); - boolean success = job.waitForCompletion(true); - boolean delete = false; - synchronized (ImportTsv.class) { - delete = DRY_RUN_TABLE_CREATED; - } - if (delete) { - deleteTable(getConf(), args); - } - return success ? 0 : 1; - } - - public static void main(String[] args) throws Exception { - int status = ToolRunner.run(HBaseConfiguration.create(), new ImportTsv(), args); - System.exit(status); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java deleted file mode 100644 index 953df62..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.net.URLDecoder; -import java.text.MessageFormat; -import java.util.Enumeration; -import java.util.jar.JarFile; -import java.util.jar.JarOutputStream; -import java.util.jar.Manifest; -import java.util.zip.ZipEntry; -import java.util.zip.ZipOutputStream; - -/** - * Finds the Jar for a class. If the class is in a directory in the - * classpath, it creates a Jar on the fly with the contents of the directory - * and returns the path to that Jar. If a Jar is created, it is created in - * the system temporary directory. - * - * This file was forked from hadoop/common/branches/branch-2@1377176. - */ -public class JarFinder { - - private static void copyToZipStream(File file, ZipEntry entry, - ZipOutputStream zos) throws IOException { - InputStream is = new FileInputStream(file); - try { - zos.putNextEntry(entry); - byte[] arr = new byte[4096]; - int read = is.read(arr); - while (read > -1) { - zos.write(arr, 0, read); - read = is.read(arr); - } - } finally { - try { - is.close(); - } finally { - zos.closeEntry(); - } - } - } - - public static void jarDir(File dir, String relativePath, ZipOutputStream zos) - throws IOException { - Preconditions.checkNotNull(relativePath, "relativePath"); - Preconditions.checkNotNull(zos, "zos"); - - // by JAR spec, if there is a manifest, it must be the first entry in the - // ZIP. - File manifestFile = new File(dir, JarFile.MANIFEST_NAME); - ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME); - if (!manifestFile.exists()) { - zos.putNextEntry(manifestEntry); - new Manifest().write(new BufferedOutputStream(zos)); - zos.closeEntry(); - } else { - copyToZipStream(manifestFile, manifestEntry, zos); - } - zos.closeEntry(); - zipDir(dir, relativePath, zos, true); - zos.close(); - } - - private static void zipDir(File dir, String relativePath, ZipOutputStream zos, - boolean start) throws IOException { - String[] dirList = dir.list(); - if (dirList == null) { - return; - } - for (String aDirList : dirList) { - File f = new File(dir, aDirList); - if (!f.isHidden()) { - if (f.isDirectory()) { - if (!start) { - ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/"); - zos.putNextEntry(dirEntry); - zos.closeEntry(); - } - String filePath = f.getPath(); - File file = new File(filePath); - zipDir(file, relativePath + f.getName() + "/", zos, false); - } - else { - String path = relativePath + f.getName(); - if (!path.equals(JarFile.MANIFEST_NAME)) { - ZipEntry anEntry = new ZipEntry(path); - copyToZipStream(f, anEntry, zos); - } - } - } - } - } - - private static void createJar(File dir, File jarFile) throws IOException { - Preconditions.checkNotNull(dir, "dir"); - Preconditions.checkNotNull(jarFile, "jarFile"); - File jarDir = jarFile.getParentFile(); - if (!jarDir.exists()) { - if (!jarDir.mkdirs()) { - throw new IOException(MessageFormat.format("could not create dir [{0}]", - jarDir)); - } - } - try (FileOutputStream fos = new FileOutputStream(jarFile); - JarOutputStream jos = new JarOutputStream(fos)) { - jarDir(dir, "", jos); - } - } - - /** - * Returns the full path to the Jar containing the class. It always return a - * JAR. - * - * @param klass class. - * - * @return path to the Jar containing the class. - */ - public static String getJar(Class klass) { - Preconditions.checkNotNull(klass, "klass"); - ClassLoader loader = klass.getClassLoader(); - if (loader != null) { - String class_file = klass.getName().replaceAll("\\.", "/") + ".class"; - try { - for (Enumeration itr = loader.getResources(class_file); - itr.hasMoreElements(); ) { - URL url = (URL) itr.nextElement(); - String path = url.getPath(); - if (path.startsWith("file:")) { - path = path.substring("file:".length()); - } - path = URLDecoder.decode(path, "UTF-8"); - if ("jar".equals(url.getProtocol())) { - path = URLDecoder.decode(path, "UTF-8"); - return path.replaceAll("!.*$", ""); - } - else if ("file".equals(url.getProtocol())) { - String klassName = klass.getName(); - klassName = klassName.replace(".", "/") + ".class"; - path = path.substring(0, path.length() - klassName.length()); - File baseDir = new File(path); - File testDir = new File(System.getProperty("test.build.dir", "target/test-dir")); - testDir = testDir.getAbsoluteFile(); - if (!testDir.exists()) { - testDir.mkdirs(); - } - File tempJar = File.createTempFile("hadoop-", "", testDir); - tempJar = new File(tempJar.getAbsolutePath() + ".jar"); - tempJar.deleteOnExit(); - createJar(baseDir, tempJar); - return tempJar.getAbsolutePath(); - } - } - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java deleted file mode 100644 index 241608b..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.Serialization; -import org.apache.hadoop.io.serializer.Serializer; - [email protected] -public class KeyValueSerialization implements Serialization<KeyValue> { - @Override - public boolean accept(Class<?> c) { - return KeyValue.class.isAssignableFrom(c); - } - - @Override - public KeyValueDeserializer getDeserializer(Class<KeyValue> t) { - return new KeyValueDeserializer(); - } - - @Override - public KeyValueSerializer getSerializer(Class<KeyValue> c) { - return new KeyValueSerializer(); - } - - public static class KeyValueDeserializer implements Deserializer<KeyValue> { - private DataInputStream dis; - - @Override - public void close() throws IOException { - this.dis.close(); - } - - @Override - public KeyValue deserialize(KeyValue ignore) throws IOException { - // I can't overwrite the passed in KV, not from a proto kv, not just yet. TODO - return KeyValueUtil.create(this.dis); - } - - @Override - public void open(InputStream is) throws IOException { - this.dis = new DataInputStream(is); - } - } - - public static class KeyValueSerializer implements Serializer<KeyValue> { - private DataOutputStream dos; - - @Override - public void close() throws IOException { - this.dos.close(); - } - - @Override - public void open(OutputStream os) throws IOException { - this.dos = new DataOutputStream(os); - } - - @Override - public void serialize(KeyValue kv) throws IOException { - KeyValueUtil.write(kv, this.dos); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java deleted file mode 100644 index 5c7ace2..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.util.TreeSet; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapreduce.Reducer; - -/** - * Emits sorted KeyValues. - * Reads in all KeyValues from passed Iterator, sorts them, then emits - * KeyValues in sorted order. If lots of columns per row, it will use lots of - * memory sorting. - * @see HFileOutputFormat2 - */ [email protected] -public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> { - protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs, - org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context) - throws java.io.IOException, InterruptedException { - TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR); - for (KeyValue kv: kvs) { - try { - map.add(kv.clone()); - } catch (CloneNotSupportedException e) { - throw new java.io.IOException(e); - } - } - context.setStatus("Read " + map.getClass()); - int index = 0; - for (KeyValue kv: map) { - context.write(row, kv); - if (++index % 100 == 0) context.setStatus("Wrote " + index); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java deleted file mode 100644 index d7c7cc0..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Job; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.List; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - -/** - * Create 3 level tree directory, first level is using table name as parent - * directory and then use family name as child directory, and all related HFiles - * for one family are under child directory - * -tableName1 - * -columnFamilyName1 - * -columnFamilyName2 - * -HFiles - * -tableName2 - * -columnFamilyName1 - * -HFiles - * -columnFamilyName2 - */ [email protected] -@VisibleForTesting -public class MultiTableHFileOutputFormat extends HFileOutputFormat2 { - private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class); - - /** - * Creates a composite key to use as a mapper output key when using - * MultiTableHFileOutputFormat.configureIncrementaLoad to set up bulk ingest job - * - * @param tableName Name of the Table - Eg: TableName.getNameAsString() - * @param suffix Usually represents a rowkey when creating a mapper key or column family - * @return byte[] representation of composite key - */ - public static byte[] createCompositeKey(byte[] tableName, - byte[] suffix) { - return combineTableNameSuffix(tableName, suffix); - } - - /** - * Alternate api which accepts an ImmutableBytesWritable for the suffix - * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[]) - */ - public static byte[] createCompositeKey(byte[] tableName, - ImmutableBytesWritable suffix) { - return combineTableNameSuffix(tableName, suffix.get()); - } - - /** - * Alternate api which accepts a String for the tableName and ImmutableBytesWritable for the - * suffix - * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[]) - */ - public static byte[] createCompositeKey(String tableName, - ImmutableBytesWritable suffix) { - return combineTableNameSuffix(tableName.getBytes(Charset.forName("UTF-8")), suffix.get()); - } - - /** - * Analogous to - * {@link HFileOutputFormat2#configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}, - * this function will configure the requisite number of reducers to write HFiles for multple - * tables simultaneously - * - * @param job See {@link org.apache.hadoop.mapreduce.Job} - * @param multiTableDescriptors Table descriptor and region locator pairs - * @throws IOException - */ - public static void configureIncrementalLoad(Job job, List<TableInfo> - multiTableDescriptors) - throws IOException { - MultiTableHFileOutputFormat.configureIncrementalLoad(job, multiTableDescriptors, - MultiTableHFileOutputFormat.class); - } - - final private static int validateCompositeKey(byte[] keyBytes) { - - int separatorIdx = Bytes.indexOf(keyBytes, HFileOutputFormat2.tableSeparator); - - // Either the separator was not found or a tablename wasn't present or a key wasn't present - if (separatorIdx == -1) { - throw new IllegalArgumentException("Invalid format for composite key [" + Bytes - .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key"); - } - return separatorIdx; - } - - protected static byte[] getTableName(byte[] keyBytes) { - int separatorIdx = validateCompositeKey(keyBytes); - return Bytes.copy(keyBytes, 0, separatorIdx); - } - - protected static byte[] getSuffix(byte[] keyBytes) { - int separatorIdx = validateCompositeKey(keyBytes); - return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length - separatorIdx - 1); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java deleted file mode 100644 index a8e6837..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Scan; - -/** - * Convert HBase tabular data from multiple scanners into a format that - * is consumable by Map/Reduce. - * - * <p> - * Usage example - * </p> - * - * <pre> - * List<Scan> scans = new ArrayList<Scan>(); - * - * Scan scan1 = new Scan(); - * scan1.setStartRow(firstRow1); - * scan1.setStopRow(lastRow1); - * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1); - * scans.add(scan1); - * - * Scan scan2 = new Scan(); - * scan2.setStartRow(firstRow2); - * scan2.setStopRow(lastRow2); - * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2); - * scans.add(scan2); - * - * TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class, - * IntWritable.class, job); - * </pre> - */ [email protected] -public class MultiTableInputFormat extends MultiTableInputFormatBase implements - Configurable { - - /** Job parameter that specifies the scan list. */ - public static final String SCANS = "hbase.mapreduce.scans"; - - /** The configuration. */ - private Configuration conf = null; - - /** - * Returns the current configuration. - * - * @return The current configuration. - * @see org.apache.hadoop.conf.Configurable#getConf() - */ - @Override - public Configuration getConf() { - return conf; - } - - /** - * Sets the configuration. This is used to set the details for the tables to - * be scanned. - * - * @param configuration The configuration to set. - * @see org.apache.hadoop.conf.Configurable#setConf( - * org.apache.hadoop.conf.Configuration) - */ - @Override - public void setConf(Configuration configuration) { - this.conf = configuration; - String[] rawScans = conf.getStrings(SCANS); - if (rawScans.length <= 0) { - throw new IllegalArgumentException("There must be at least 1 scan configuration set to : " - + SCANS); - } - List<Scan> scans = new ArrayList<>(); - - for (int i = 0; i < rawScans.length; i++) { - try { - scans.add(TableMapReduceUtil.convertStringToScan(rawScans[i])); - } catch (IOException e) { - throw new RuntimeException("Failed to convert Scan : " + rawScans[i] + " to string", e); - } - } - this.setScans(scans); - } -}
