http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java new file mode 100644 index 0000000..3c3060b --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java @@ -0,0 +1,140 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Partitioner; + +/** + * This is used to partition the output keys into groups of keys. + * Keys are grouped according to the regions that currently exist + * so that each reducer fills a single region so load is distributed. + * + * <p>This class is not suitable as partitioner creating hfiles + * for incremental bulk loads as region spread will likely change between time of + * hfile creation and load time. See {@link LoadIncrementalHFiles} + * and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.</p> + * + * @param <KEY> The type of the key. + * @param <VALUE> The type of the value. + */ +@InterfaceAudience.Public +public class HRegionPartitioner<KEY, VALUE> +extends Partitioner<ImmutableBytesWritable, VALUE> +implements Configurable { + + private static final Log LOG = LogFactory.getLog(HRegionPartitioner.class); + private Configuration conf = null; + // Connection and locator are not cleaned up; they just die when partitioner is done. + private Connection connection; + private RegionLocator locator; + private byte[][] startKeys; + + /** + * Gets the partition number for a given key (hence record) given the total + * number of partitions i.e. number of reduce-tasks for the job. + * + * <p>Typically a hash function on a all or a subset of the key.</p> + * + * @param key The key to be partitioned. + * @param value The entry value. + * @param numPartitions The total number of partitions. + * @return The partition number for the <code>key</code>. + * @see org.apache.hadoop.mapreduce.Partitioner#getPartition( + * java.lang.Object, java.lang.Object, int) + */ + @Override + public int getPartition(ImmutableBytesWritable key, + VALUE value, int numPartitions) { + byte[] region = null; + // Only one region return 0 + if (this.startKeys.length == 1){ + return 0; + } + try { + // Not sure if this is cached after a split so we could have problems + // here if a region splits while mapping + region = this.locator.getRegionLocation(key.get()).getRegionInfo().getStartKey(); + } catch (IOException e) { + LOG.error(e); + } + for (int i = 0; i < this.startKeys.length; i++){ + if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ + if (i >= numPartitions-1){ + // cover if we have less reduces then regions. + return (Integer.toString(i).hashCode() + & Integer.MAX_VALUE) % numPartitions; + } + return i; + } + } + // if above fails to find start key that match we need to return something + return 0; + } + + /** + * Returns the current configuration. + * + * @return The current configuration. + * @see org.apache.hadoop.conf.Configurable#getConf() + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Sets the configuration. This is used to determine the start keys for the + * given table. + * + * @param configuration The configuration to set. + * @see org.apache.hadoop.conf.Configurable#setConf( + * org.apache.hadoop.conf.Configuration) + */ + @Override + public void setConf(Configuration configuration) { + this.conf = HBaseConfiguration.create(configuration); + try { + this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(conf)); + TableName tableName = TableName.valueOf(conf.get(TableOutputFormat.OUTPUT_TABLE)); + this.locator = this.connection.getRegionLocator(tableName); + } catch (IOException e) { + LOG.error(e); + } + try { + this.startKeys = this.locator.getStartKeys(); + } catch (IOException e) { + LOG.error(e); + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java new file mode 100644 index 0000000..2c8caf5 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java @@ -0,0 +1,747 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Charsets; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Ordering; + +public class HashTable extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(HashTable.class); + + private static final int DEFAULT_BATCH_SIZE = 8000; + + private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size"; + final static String PARTITIONS_FILE_NAME = "partitions"; + final static String MANIFEST_FILE_NAME = "manifest"; + final static String HASH_DATA_DIR = "hashes"; + final static String OUTPUT_DATA_FILE_PREFIX = "part-r-"; + private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp"; + + TableHash tableHash = new TableHash(); + Path destPath; + + public HashTable(Configuration conf) { + super(conf); + } + + public static class TableHash { + + Path hashDir; + + String tableName; + String families = null; + long batchSize = DEFAULT_BATCH_SIZE; + int numHashFiles = 0; + byte[] startRow = HConstants.EMPTY_START_ROW; + byte[] stopRow = HConstants.EMPTY_END_ROW; + int scanBatch = 0; + int versions = -1; + long startTime = 0; + long endTime = 0; + + List<ImmutableBytesWritable> partitions; + + public static TableHash read(Configuration conf, Path hashDir) throws IOException { + TableHash tableHash = new TableHash(); + FileSystem fs = hashDir.getFileSystem(conf); + tableHash.hashDir = hashDir; + tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME)); + tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME)); + return tableHash; + } + + void writePropertiesFile(FileSystem fs, Path path) throws IOException { + Properties p = new Properties(); + p.setProperty("table", tableName); + if (families != null) { + p.setProperty("columnFamilies", families); + } + p.setProperty("targetBatchSize", Long.toString(batchSize)); + p.setProperty("numHashFiles", Integer.toString(numHashFiles)); + if (!isTableStartRow(startRow)) { + p.setProperty("startRowHex", Bytes.toHex(startRow)); + } + if (!isTableEndRow(stopRow)) { + p.setProperty("stopRowHex", Bytes.toHex(stopRow)); + } + if (scanBatch > 0) { + p.setProperty("scanBatch", Integer.toString(scanBatch)); + } + if (versions >= 0) { + p.setProperty("versions", Integer.toString(versions)); + } + if (startTime != 0) { + p.setProperty("startTimestamp", Long.toString(startTime)); + } + if (endTime != 0) { + p.setProperty("endTimestamp", Long.toString(endTime)); + } + + try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) { + p.store(osw, null); + } + } + + void readPropertiesFile(FileSystem fs, Path path) throws IOException { + Properties p = new Properties(); + try (FSDataInputStream in = fs.open(path)) { + try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) { + p.load(isr); + } + } + tableName = p.getProperty("table"); + families = p.getProperty("columnFamilies"); + batchSize = Long.parseLong(p.getProperty("targetBatchSize")); + numHashFiles = Integer.parseInt(p.getProperty("numHashFiles")); + + String startRowHex = p.getProperty("startRowHex"); + if (startRowHex != null) { + startRow = Bytes.fromHex(startRowHex); + } + String stopRowHex = p.getProperty("stopRowHex"); + if (stopRowHex != null) { + stopRow = Bytes.fromHex(stopRowHex); + } + + String scanBatchString = p.getProperty("scanBatch"); + if (scanBatchString != null) { + scanBatch = Integer.parseInt(scanBatchString); + } + + String versionString = p.getProperty("versions"); + if (versionString != null) { + versions = Integer.parseInt(versionString); + } + + String startTimeString = p.getProperty("startTimestamp"); + if (startTimeString != null) { + startTime = Long.parseLong(startTimeString); + } + + String endTimeString = p.getProperty("endTimestamp"); + if (endTimeString != null) { + endTime = Long.parseLong(endTimeString); + } + } + + Scan initScan() throws IOException { + Scan scan = new Scan(); + scan.setCacheBlocks(false); + if (startTime != 0 || endTime != 0) { + scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); + } + if (scanBatch > 0) { + scan.setBatch(scanBatch); + } + if (versions >= 0) { + scan.setMaxVersions(versions); + } + if (!isTableStartRow(startRow)) { + scan.setStartRow(startRow); + } + if (!isTableEndRow(stopRow)) { + scan.setStopRow(stopRow); + } + if(families != null) { + for(String fam : families.split(",")) { + scan.addFamily(Bytes.toBytes(fam)); + } + } + return scan; + } + + /** + * Choose partitions between row ranges to hash to a single output file + * Selects region boundaries that fall within the scan range, and groups them + * into the desired number of partitions. + */ + void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) { + List<byte[]> startKeys = new ArrayList<>(); + for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) { + byte[] regionStartKey = regionStartEndKeys.getFirst()[i]; + byte[] regionEndKey = regionStartEndKeys.getSecond()[i]; + + // if scan begins after this region, or starts before this region, then drop this region + // in other words: + // IF (scan begins before the end of this region + // AND scan ends before the start of this region) + // THEN include this region + if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey) + || Bytes.compareTo(startRow, regionEndKey) < 0) + && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey) + || Bytes.compareTo(stopRow, regionStartKey) > 0)) { + startKeys.add(regionStartKey); + } + } + + int numRegions = startKeys.size(); + if (numHashFiles == 0) { + numHashFiles = numRegions / 100; + } + if (numHashFiles == 0) { + numHashFiles = 1; + } + if (numHashFiles > numRegions) { + // can't partition within regions + numHashFiles = numRegions; + } + + // choose a subset of start keys to group regions into ranges + partitions = new ArrayList<>(numHashFiles - 1); + // skip the first start key as it is not a partition between ranges. + for (long i = 1; i < numHashFiles; i++) { + int splitIndex = (int) (numRegions * i / numHashFiles); + partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex))); + } + } + + void writePartitionFile(Configuration conf, Path path) throws IOException { + FileSystem fs = path.getFileSystem(conf); + @SuppressWarnings("deprecation") + SequenceFile.Writer writer = SequenceFile.createWriter( + fs, conf, path, ImmutableBytesWritable.class, NullWritable.class); + + for (int i = 0; i < partitions.size(); i++) { + writer.append(partitions.get(i), NullWritable.get()); + } + writer.close(); + } + + private void readPartitionFile(FileSystem fs, Configuration conf, Path path) + throws IOException { + @SuppressWarnings("deprecation") + SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); + ImmutableBytesWritable key = new ImmutableBytesWritable(); + partitions = new ArrayList<>(); + while (reader.next(key)) { + partitions.add(new ImmutableBytesWritable(key.copyBytes())); + } + reader.close(); + + if (!Ordering.natural().isOrdered(partitions)) { + throw new IOException("Partitions are not ordered!"); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("tableName=").append(tableName); + if (families != null) { + sb.append(", families=").append(families); + } + sb.append(", batchSize=").append(batchSize); + sb.append(", numHashFiles=").append(numHashFiles); + if (!isTableStartRow(startRow)) { + sb.append(", startRowHex=").append(Bytes.toHex(startRow)); + } + if (!isTableEndRow(stopRow)) { + sb.append(", stopRowHex=").append(Bytes.toHex(stopRow)); + } + if (scanBatch >= 0) { + sb.append(", scanBatch=").append(scanBatch); + } + if (versions >= 0) { + sb.append(", versions=").append(versions); + } + if (startTime != 0) { + sb.append("startTime=").append(startTime); + } + if (endTime != 0) { + sb.append("endTime=").append(endTime); + } + return sb.toString(); + } + + static String getDataFileName(int hashFileIndex) { + return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex); + } + + /** + * Open a TableHash.Reader starting at the first hash at or after the given key. + * @throws IOException + */ + public Reader newReader(Configuration conf, ImmutableBytesWritable startKey) + throws IOException { + return new Reader(conf, startKey); + } + + public class Reader implements java.io.Closeable { + private final Configuration conf; + + private int hashFileIndex; + private MapFile.Reader mapFileReader; + + private boolean cachedNext; + private ImmutableBytesWritable key; + private ImmutableBytesWritable hash; + + Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException { + this.conf = conf; + int partitionIndex = Collections.binarySearch(partitions, startKey); + if (partitionIndex >= 0) { + // if the key is equal to a partition, then go the file after that partition + hashFileIndex = partitionIndex+1; + } else { + // if the key is between partitions, then go to the file between those partitions + hashFileIndex = -1-partitionIndex; + } + openHashFile(); + + // MapFile's don't make it easy to seek() so that the subsequent next() returns + // the desired key/value pair. So we cache it for the first call of next(). + hash = new ImmutableBytesWritable(); + key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash); + if (key == null) { + cachedNext = false; + hash = null; + } else { + cachedNext = true; + } + } + + /** + * Read the next key/hash pair. + * Returns true if such a pair exists and false when at the end of the data. + */ + public boolean next() throws IOException { + if (cachedNext) { + cachedNext = false; + return true; + } + key = new ImmutableBytesWritable(); + hash = new ImmutableBytesWritable(); + while (true) { + boolean hasNext = mapFileReader.next(key, hash); + if (hasNext) { + return true; + } + hashFileIndex++; + if (hashFileIndex < TableHash.this.numHashFiles) { + mapFileReader.close(); + openHashFile(); + } else { + key = null; + hash = null; + return false; + } + } + } + + /** + * Get the current key + * @return the current key or null if there is no current key + */ + public ImmutableBytesWritable getCurrentKey() { + return key; + } + + /** + * Get the current hash + * @return the current hash or null if there is no current hash + */ + public ImmutableBytesWritable getCurrentHash() { + return hash; + } + + private void openHashFile() throws IOException { + if (mapFileReader != null) { + mapFileReader.close(); + } + Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR); + Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex)); + mapFileReader = new MapFile.Reader(dataFile, conf); + } + + @Override + public void close() throws IOException { + mapFileReader.close(); + } + } + } + + static boolean isTableStartRow(byte[] row) { + return Bytes.equals(HConstants.EMPTY_START_ROW, row); + } + + static boolean isTableEndRow(byte[] row) { + return Bytes.equals(HConstants.EMPTY_END_ROW, row); + } + + public Job createSubmittableJob(String[] args) throws IOException { + Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME); + generatePartitions(partitionsPath); + + Job job = Job.getInstance(getConf(), + getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName)); + Configuration jobConf = job.getConfiguration(); + jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize); + job.setJarByClass(HashTable.class); + + TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(), + HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + + // use a TotalOrderPartitioner and reducers to group region output into hash files + job.setPartitionerClass(TotalOrderPartitioner.class); + TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath); + job.setReducerClass(Reducer.class); // identity reducer + job.setNumReduceTasks(tableHash.numHashFiles); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(ImmutableBytesWritable.class); + job.setOutputFormatClass(MapFileOutputFormat.class); + FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR)); + + return job; + } + + private void generatePartitions(Path partitionsPath) throws IOException { + Connection connection = ConnectionFactory.createConnection(getConf()); + Pair<byte[][], byte[][]> regionKeys + = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys(); + connection.close(); + + tableHash.selectPartitions(regionKeys); + LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath); + + tableHash.writePartitionFile(getConf(), partitionsPath); + } + + static class ResultHasher { + private MessageDigest digest; + + private boolean batchStarted = false; + private ImmutableBytesWritable batchStartKey; + private ImmutableBytesWritable batchHash; + private long batchSize = 0; + + + public ResultHasher() { + try { + digest = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + Throwables.propagate(e); + } + } + + public void startBatch(ImmutableBytesWritable row) { + if (batchStarted) { + throw new RuntimeException("Cannot start new batch without finishing existing one."); + } + batchStarted = true; + batchSize = 0; + batchStartKey = row; + batchHash = null; + } + + public void hashResult(Result result) { + if (!batchStarted) { + throw new RuntimeException("Cannot add to batch that has not been started."); + } + for (Cell cell : result.rawCells()) { + int rowLength = cell.getRowLength(); + int familyLength = cell.getFamilyLength(); + int qualifierLength = cell.getQualifierLength(); + int valueLength = cell.getValueLength(); + digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength); + digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength); + digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength); + long ts = cell.getTimestamp(); + for (int i = 8; i > 0; i--) { + digest.update((byte) ts); + ts >>>= 8; + } + digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength); + + batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength; + } + } + + public void finishBatch() { + if (!batchStarted) { + throw new RuntimeException("Cannot finish batch that has not started."); + } + batchStarted = false; + batchHash = new ImmutableBytesWritable(digest.digest()); + } + + public boolean isBatchStarted() { + return batchStarted; + } + + public ImmutableBytesWritable getBatchStartKey() { + return batchStartKey; + } + + public ImmutableBytesWritable getBatchHash() { + return batchHash; + } + + public long getBatchSize() { + return batchSize; + } + } + + public static class HashMapper + extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { + + private ResultHasher hasher; + private long targetBatchSize; + + private ImmutableBytesWritable currentRow; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + targetBatchSize = context.getConfiguration() + .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); + hasher = new ResultHasher(); + + TableSplit split = (TableSplit) context.getInputSplit(); + hasher.startBatch(new ImmutableBytesWritable(split.getStartRow())); + } + + @Override + protected void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + + if (currentRow == null || !currentRow.equals(key)) { + currentRow = new ImmutableBytesWritable(key); // not immutable + + if (hasher.getBatchSize() >= targetBatchSize) { + hasher.finishBatch(); + context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); + hasher.startBatch(currentRow); + } + } + + hasher.hashResult(value); + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + hasher.finishBatch(); + context.write(hasher.getBatchStartKey(), hasher.getBatchHash()); + } + } + + private void writeTempManifestFile() throws IOException { + Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); + FileSystem fs = tempManifestPath.getFileSystem(getConf()); + tableHash.writePropertiesFile(fs, tempManifestPath); + } + + private void completeManifest() throws IOException { + Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME); + Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME); + FileSystem fs = tempManifestPath.getFileSystem(getConf()); + fs.rename(tempManifestPath, manifestPath); + } + + private static final int NUM_ARGS = 2; + private static void printUsage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + System.err.println(); + } + System.err.println("Usage: HashTable [options] <tablename> <outputpath>"); + System.err.println(); + System.err.println("Options:"); + System.err.println(" batchsize the target amount of bytes to hash in each batch"); + System.err.println(" rows are added to the batch until this size is reached"); + System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)"); + System.err.println(" numhashfiles the number of hash files to create"); + System.err.println(" if set to fewer than number of regions then"); + System.err.println(" the job will create this number of reducers"); + System.err.println(" (defaults to 1/100 of regions -- at least 1)"); + System.err.println(" startrow the start row"); + System.err.println(" stoprow the stop row"); + System.err.println(" starttime beginning of the time range (unixtime in millis)"); + System.err.println(" without endtime means from starttime to forever"); + System.err.println(" endtime end of the time range. Ignored if no starttime specified."); + System.err.println(" scanbatch scanner batch size to support intra row scans"); + System.err.println(" versions number of cell versions to include"); + System.err.println(" families comma-separated list of families to include"); + System.err.println(); + System.err.println("Args:"); + System.err.println(" tablename Name of the table to hash"); + System.err.println(" outputpath Filesystem path to put the output data"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:"); + System.err.println(" $ hbase " + + "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50" + + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3" + + " TestTable /hashes/testTable"); + } + + private boolean doCommandLine(final String[] args) { + if (args.length < NUM_ARGS) { + printUsage(null); + return false; + } + try { + + tableHash.tableName = args[args.length-2]; + destPath = new Path(args[args.length-1]); + + for (int i = 0; i < args.length - NUM_ARGS; i++) { + String cmd = args[i]; + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(null); + return false; + } + + final String batchSizeArgKey = "--batchsize="; + if (cmd.startsWith(batchSizeArgKey)) { + tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length())); + continue; + } + + final String numHashFilesArgKey = "--numhashfiles="; + if (cmd.startsWith(numHashFilesArgKey)) { + tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length())); + continue; + } + + final String startRowArgKey = "--startrow="; + if (cmd.startsWith(startRowArgKey)) { + tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length())); + continue; + } + + final String stopRowArgKey = "--stoprow="; + if (cmd.startsWith(stopRowArgKey)) { + tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length())); + continue; + } + + final String startTimeArgKey = "--starttime="; + if (cmd.startsWith(startTimeArgKey)) { + tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); + continue; + } + + final String endTimeArgKey = "--endtime="; + if (cmd.startsWith(endTimeArgKey)) { + tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); + continue; + } + + final String scanBatchArgKey = "--scanbatch="; + if (cmd.startsWith(scanBatchArgKey)) { + tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length())); + continue; + } + + final String versionsArgKey = "--versions="; + if (cmd.startsWith(versionsArgKey)) { + tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); + continue; + } + + final String familiesArgKey = "--families="; + if (cmd.startsWith(familiesArgKey)) { + tableHash.families = cmd.substring(familiesArgKey.length()); + continue; + } + + printUsage("Invalid argument '" + cmd + "'"); + return false; + } + if ((tableHash.startTime != 0 || tableHash.endTime != 0) + && (tableHash.startTime >= tableHash.endTime)) { + printUsage("Invalid time range filter: starttime=" + + tableHash.startTime + " >= endtime=" + tableHash.endTime); + return false; + } + + } catch (Exception e) { + e.printStackTrace(); + printUsage("Can't start because " + e.getMessage()); + return false; + } + return true; + } + + /** + * Main entry point. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + if (!doCommandLine(otherArgs)) { + return 1; + } + + Job job = createSubmittableJob(otherArgs); + writeTempManifestFile(); + if (!job.waitForCompletion(true)) { + LOG.info("Map-reduce job failed!"); + return 1; + } + completeManifest(); + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java new file mode 100644 index 0000000..7103ef8 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java @@ -0,0 +1,67 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Job; + +/** + * Pass the given key and record as-is to the reduce phase. + */ +@InterfaceAudience.Public +public class IdentityTableMapper +extends TableMapper<ImmutableBytesWritable, Result> { + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table name. + * @param scan The scan with the columns to scan. + * @param mapper The mapper class. + * @param job The job configuration. + * @throws IOException When setting up the job fails. + */ + @SuppressWarnings("rawtypes") + public static void initJob(String table, Scan scan, + Class<? extends TableMapper> mapper, Job job) throws IOException { + TableMapReduceUtil.initTableMapperJob(table, scan, mapper, + ImmutableBytesWritable.class, Result.class, job); + } + + /** + * Pass the key, value to reduce. + * + * @param key The current key. + * @param value The current value. + * @param context The current context. + * @throws IOException When writing the record fails. + * @throws InterruptedException When the job is aborted. + */ + public void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + context.write(key, value); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java new file mode 100644 index 0000000..73475db --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java @@ -0,0 +1,79 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.io.Writable; + +/** + * Convenience class that simply writes all values (which must be + * {@link org.apache.hadoop.hbase.client.Put Put} or + * {@link org.apache.hadoop.hbase.client.Delete Delete} instances) + * passed to it out to the configured HBase table. This works in combination + * with {@link TableOutputFormat} which actually does the writing to HBase.<p> + * + * Keys are passed along but ignored in TableOutputFormat. However, they can + * be used to control how your values will be divided up amongst the specified + * number of reducers. <p> + * + * You can also use the {@link TableMapReduceUtil} class to set up the two + * classes in one step: + * <blockquote><code> + * TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job); + * </code></blockquote> + * This will also set the proper {@link TableOutputFormat} which is given the + * <code>table</code> parameter. The + * {@link org.apache.hadoop.hbase.client.Put Put} or + * {@link org.apache.hadoop.hbase.client.Delete Delete} define the + * row and columns implicitly. + */ +@InterfaceAudience.Public +public class IdentityTableReducer +extends TableReducer<Writable, Mutation, Writable> { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class); + + /** + * Writes each given record, consisting of the row key and the given values, + * to the configured {@link org.apache.hadoop.mapreduce.OutputFormat}. + * It is emitting the row key and each {@link org.apache.hadoop.hbase.client.Put Put} + * or {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs. + * + * @param key The current row key. + * @param values The {@link org.apache.hadoop.hbase.client.Put Put} or + * {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given + * row. + * @param context The context of the reduce. + * @throws IOException When writing the record fails. + * @throws InterruptedException When the job gets interrupted. + */ + @Override + public void reduce(Writable key, Iterable<Mutation> values, Context context) + throws IOException, InterruptedException { + for(Mutation putOrDelete : values) { + context.write(key, putOrDelete); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java new file mode 100644 index 0000000..18dcf35 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -0,0 +1,780 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.KeeperException; + + +/** + * Import data written by {@link Export}. + */ +@InterfaceAudience.Public +public class Import extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(Import.class); + final static String NAME = "import"; + public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS"; + public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output"; + public final static String FILTER_CLASS_CONF_KEY = "import.filter.class"; + public final static String FILTER_ARGS_CONF_KEY = "import.filter.args"; + public final static String TABLE_NAME = "import.table.name"; + public final static String WAL_DURABILITY = "import.wal.durability"; + public final static String HAS_LARGE_RESULT= "import.bulk.hasLargeResult"; + + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + + public static class KeyValueWritableComparablePartitioner + extends Partitioner<KeyValueWritableComparable, KeyValue> { + private static KeyValueWritableComparable[] START_KEYS = null; + @Override + public int getPartition(KeyValueWritableComparable key, KeyValue value, + int numPartitions) { + for (int i = 0; i < START_KEYS.length; ++i) { + if (key.compareTo(START_KEYS[i]) <= 0) { + return i; + } + } + return START_KEYS.length; + } + + } + + public static class KeyValueWritableComparable + implements WritableComparable<KeyValueWritableComparable> { + + private KeyValue kv = null; + + static { + // register this comparator + WritableComparator.define(KeyValueWritableComparable.class, + new KeyValueWritableComparator()); + } + + public KeyValueWritableComparable() { + } + + public KeyValueWritableComparable(KeyValue kv) { + this.kv = kv; + } + + @Override + public void write(DataOutput out) throws IOException { + KeyValue.write(kv, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + kv = KeyValue.create(in); + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", + justification="This is wrong, yes, but we should be purging Writables, not fixing them") + public int compareTo(KeyValueWritableComparable o) { + return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv); + } + + public static class KeyValueWritableComparator extends WritableComparator { + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + try { + KeyValueWritableComparable kv1 = new KeyValueWritableComparable(); + kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1))); + KeyValueWritableComparable kv2 = new KeyValueWritableComparable(); + kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2))); + return compare(kv1, kv2); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + } + + } + + public static class KeyValueReducer + extends + Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> { + protected void reduce( + KeyValueWritableComparable row, + Iterable<KeyValue> kvs, + Reducer<KeyValueWritableComparable, + KeyValue, ImmutableBytesWritable, KeyValue>.Context context) + throws java.io.IOException, InterruptedException { + int index = 0; + for (KeyValue kv : kvs) { + context.write(new ImmutableBytesWritable(kv.getRowArray()), kv); + if (++index % 100 == 0) + context.setStatus("Wrote " + index + " KeyValues, " + + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray())); + } + } + } + + public static class KeyValueSortImporter + extends TableMapper<KeyValueWritableComparable, KeyValue> { + private Map<byte[], byte[]> cfRenameMap; + private Filter filter; + private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); + + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Considering the row." + + Bytes.toString(row.get(), row.getOffset(), row.getLength())); + } + if (filter == null + || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(), + (short) row.getLength()))) { + for (Cell kv : value.rawCells()) { + kv = filterKv(filter, kv); + // skip if we filtered it out + if (kv == null) continue; + // TODO get rid of ensureKeyValue + KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)); + context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) throws IOException { + cfRenameMap = createCfRenameMap(context.getConfiguration()); + filter = instantiateFilter(context.getConfiguration()); + int reduceNum = context.getNumReduceTasks(); + Configuration conf = context.getConfiguration(); + TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME)); + try (Connection conn = ConnectionFactory.createConnection(conf); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + byte[][] startKeys = regionLocator.getStartKeys(); + if (startKeys.length != reduceNum) { + throw new IOException("Region split after job initialization"); + } + KeyValueWritableComparable[] startKeyWraps = + new KeyValueWritableComparable[startKeys.length - 1]; + for (int i = 1; i < startKeys.length; ++i) { + startKeyWraps[i - 1] = + new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i])); + } + KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps; + } + } + } + + /** + * A mapper that just writes out KeyValues. + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS", + justification="Writables are going away and this has been this way forever") + public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> { + private Map<byte[], byte[]> cfRenameMap; + private Filter filter; + private static final Log LOG = LogFactory.getLog(KeyValueImporter.class); + + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Considering the row." + + Bytes.toString(row.get(), row.getOffset(), row.getLength())); + } + if (filter == null + || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(), + (short) row.getLength()))) { + for (Cell kv : value.rawCells()) { + kv = filterKv(filter, kv); + // skip if we filtered it out + if (kv == null) continue; + // TODO get rid of ensureKeyValue + context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap))); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void setup(Context context) { + cfRenameMap = createCfRenameMap(context.getConfiguration()); + filter = instantiateFilter(context.getConfiguration()); + } + } + + /** + * Write table content out to files in hdfs. + */ + public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> { + private Map<byte[], byte[]> cfRenameMap; + private List<UUID> clusterIds; + private Filter filter; + private Durability durability; + + /** + * @param row The current table row key. + * @param value The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + */ + @Override + public void map(ImmutableBytesWritable row, Result value, + Context context) + throws IOException { + try { + writeResult(row, value, context); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private void writeResult(ImmutableBytesWritable key, Result result, Context context) + throws IOException, InterruptedException { + Put put = null; + Delete delete = null; + if (LOG.isTraceEnabled()) { + LOG.trace("Considering the row." + + Bytes.toString(key.get(), key.getOffset(), key.getLength())); + } + if (filter == null + || !filter.filterRowKey(CellUtil.createFirstOnRow(key.get(), key.getOffset(), + (short) key.getLength()))) { + processKV(key, result, context, put, delete); + } + } + + protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put, + Delete delete) throws IOException, InterruptedException { + for (Cell kv : result.rawCells()) { + kv = filterKv(filter, kv); + // skip if we filter it out + if (kv == null) continue; + + kv = convertKv(kv, cfRenameMap); + // Deletes and Puts are gathered and written when finished + /* + * If there are sequence of mutations and tombstones in an Export, and after Import the same + * sequence should be restored as it is. If we combine all Delete tombstones into single + * request then there is chance of ignoring few DeleteFamily tombstones, because if we + * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining + * only newest in hbase table and ignoring other. Check - HBASE-12065 + */ + if (CellUtil.isDeleteFamily(kv)) { + Delete deleteFamily = new Delete(key.get()); + deleteFamily.add(kv); + if (durability != null) { + deleteFamily.setDurability(durability); + } + deleteFamily.setClusterIds(clusterIds); + context.write(key, deleteFamily); + } else if (CellUtil.isDelete(kv)) { + if (delete == null) { + delete = new Delete(key.get()); + } + delete.add(kv); + } else { + if (put == null) { + put = new Put(key.get()); + } + addPutToKv(put, kv); + } + } + if (put != null) { + if (durability != null) { + put.setDurability(durability); + } + put.setClusterIds(clusterIds); + context.write(key, put); + } + if (delete != null) { + if (durability != null) { + delete.setDurability(durability); + } + delete.setClusterIds(clusterIds); + context.write(key, delete); + } + } + + protected void addPutToKv(Put put, Cell kv) throws IOException { + put.add(kv); + } + + @Override + public void setup(Context context) { + LOG.info("Setting up " + getClass() + " mapper."); + Configuration conf = context.getConfiguration(); + cfRenameMap = createCfRenameMap(conf); + filter = instantiateFilter(conf); + String durabilityStr = conf.get(WAL_DURABILITY); + if(durabilityStr != null){ + durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT)); + LOG.info("setting WAL durability to " + durability); + } else { + LOG.info("setting WAL durability to default."); + } + // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid. + ZooKeeperWatcher zkw = null; + Exception ex = null; + try { + zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null); + clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw)); + } catch (ZooKeeperConnectionException e) { + ex = e; + LOG.error("Problem connecting to ZooKeper during task setup", e); + } catch (KeeperException e) { + ex = e; + LOG.error("Problem reading ZooKeeper data during task setup", e); + } catch (IOException e) { + ex = e; + LOG.error("Problem setting up task", e); + } finally { + if (zkw != null) zkw.close(); + } + if (clusterIds == null) { + // exit early if setup fails + throw new RuntimeException(ex); + } + } + } + + /** + * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to + * optionally not include in the job output + * @param conf {@link Configuration} from which to load the filter + * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used + * @throws IllegalArgumentException if the filter is misconfigured + */ + public static Filter instantiateFilter(Configuration conf) { + // get the filter, if it was configured + Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); + if (filterClass == null) { + LOG.debug("No configured filter class, accepting all keyvalues."); + return null; + } + LOG.debug("Attempting to create filter:" + filterClass); + String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY); + ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs); + try { + Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class); + return (Filter) m.invoke(null, quotedArgs); + } catch (IllegalAccessException e) { + LOG.error("Couldn't instantiate filter!", e); + throw new RuntimeException(e); + } catch (SecurityException e) { + LOG.error("Couldn't instantiate filter!", e); + throw new RuntimeException(e); + } catch (NoSuchMethodException e) { + LOG.error("Couldn't instantiate filter!", e); + throw new RuntimeException(e); + } catch (IllegalArgumentException e) { + LOG.error("Couldn't instantiate filter!", e); + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + LOG.error("Couldn't instantiate filter!", e); + throw new RuntimeException(e); + } + } + + private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) { + ArrayList<byte[]> quotedArgs = new ArrayList<>(); + for (String stringArg : stringArgs) { + // all the filters' instantiation methods expected quoted args since they are coming from + // the shell, so add them here, though it shouldn't really be needed :-/ + quotedArgs.add(Bytes.toBytes("'" + stringArg + "'")); + } + return quotedArgs; + } + + /** + * Attempt to filter out the keyvalue + * @param kv {@link KeyValue} on which to apply the filter + * @return <tt>null</tt> if the key should not be written, otherwise returns the original + * {@link KeyValue} + */ + public static Cell filterKv(Filter filter, Cell kv) throws IOException { + // apply the filter and skip this kv if the filter doesn't apply + if (filter != null) { + Filter.ReturnCode code = filter.filterKeyValue(kv); + if (LOG.isTraceEnabled()) { + LOG.trace("Filter returned:" + code + " for the key value:" + kv); + } + // if its not an accept type, then skip this kv + if (!(code.equals(Filter.ReturnCode.INCLUDE) || code + .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) { + return null; + } + } + return kv; + } + + // helper: create a new KeyValue based on CF rename map + private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) { + if(cfRenameMap != null) { + // If there's a rename mapping for this CF, create a new KeyValue + byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv)); + if(newCfName != null) { + kv = new KeyValue(kv.getRowArray(), // row buffer + kv.getRowOffset(), // row offset + kv.getRowLength(), // row length + newCfName, // CF buffer + 0, // CF offset + newCfName.length, // CF length + kv.getQualifierArray(), // qualifier buffer + kv.getQualifierOffset(), // qualifier offset + kv.getQualifierLength(), // qualifier length + kv.getTimestamp(), // timestamp + KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type + kv.getValueArray(), // value buffer + kv.getValueOffset(), // value offset + kv.getValueLength()); // value length + } + } + return kv; + } + + // helper: make a map from sourceCfName to destCfName by parsing a config key + private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) { + Map<byte[], byte[]> cfRenameMap = null; + String allMappingsPropVal = conf.get(CF_RENAME_PROP); + if(allMappingsPropVal != null) { + // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,... + String[] allMappings = allMappingsPropVal.split(","); + for (String mapping: allMappings) { + if(cfRenameMap == null) { + cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + } + String [] srcAndDest = mapping.split(":"); + if(srcAndDest.length != 2) { + continue; + } + cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes()); + } + } + return cfRenameMap; + } + + /** + * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells + * the mapper how to rename column families. + * + * <p>Alternately, instead of calling this function, you could set the configuration key + * {@link #CF_RENAME_PROP} yourself. The value should look like + * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on + * the mapper behavior. + * + * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be + * set + * @param renameMap a mapping from source CF names to destination CF names + */ + static public void configureCfRenaming(Configuration conf, + Map<String, String> renameMap) { + StringBuilder sb = new StringBuilder(); + for(Map.Entry<String,String> entry: renameMap.entrySet()) { + String sourceCf = entry.getKey(); + String destCf = entry.getValue(); + + if(sourceCf.contains(":") || sourceCf.contains(",") || + destCf.contains(":") || destCf.contains(",")) { + throw new IllegalArgumentException("Illegal character in CF names: " + + sourceCf + ", " + destCf); + } + + if(sb.length() != 0) { + sb.append(","); + } + sb.append(sourceCf + ":" + destCf); + } + conf.set(CF_RENAME_PROP, sb.toString()); + } + + /** + * Add a Filter to be instantiated on import + * @param conf Configuration to update (will be passed to the job) + * @param clazz {@link Filter} subclass to instantiate on the server. + * @param filterArgs List of arguments to pass to the filter on instantiation + */ + public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz, + List<String> filterArgs) throws IOException { + conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName()); + conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()])); + } + + /** + * Sets up the actual job. + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + TableName tableName = TableName.valueOf(args[0]); + conf.set(TABLE_NAME, tableName.getNameAsString()); + Path inputDir = new Path(args[1]); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); + job.setJarByClass(Importer.class); + FileInputFormat.setInputPaths(job, inputDir); + job.setInputFormatClass(SequenceFileInputFormat.class); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + + // make sure we get the filter in the jars + try { + Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class); + if (filter != null) { + TableMapReduceUtil.addDependencyJarsForClasses(conf, filter); + } + } catch (Exception e) { + throw new IOException(e); + } + + if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) { + LOG.info("Use Large Result!!"); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); + job.setMapperClass(KeyValueSortImporter.class); + job.setReducerClass(KeyValueReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(KeyValueWritableComparable.class); + job.setMapOutputValueClass(KeyValue.class); + job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class", + KeyValueWritableComparable.KeyValueWritableComparator.class, + RawComparator.class); + Path partitionsPath = + new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())); + FileSystem fs = FileSystem.get(job.getConfiguration()); + fs.deleteOnExit(partitionsPath); + job.setPartitionerClass(KeyValueWritableComparablePartitioner.class); + job.setNumReduceTasks(regionLocator.getStartKeys().length); + TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), + org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); + } + } else if (hfileOutPath != null) { + LOG.info("writing to hfiles for bulk load."); + job.setMapperClass(KeyValueImporter.class); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)){ + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); + TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), + org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); + } + } else { + LOG.info("writing directly to table from Mapper."); + // No reducers. Just write straight to table. Call initTableReducerJob + // because it sets up the TableOutputFormat. + job.setMapperClass(Importer.class); + TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job); + job.setNumReduceTasks(0); + } + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: Import [options] <tablename> <inputdir>"); + System.err.println("By default Import will load data directly into HBase. To instead generate"); + System.err.println("HFiles of data to prepare for a bulk data load, pass the option:"); + System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err.println("If there is a large result that includes too much KeyValue " + + "whitch can occur OOME caused by the memery sort in reducer, pass the option:"); + System.err.println(" -D" + HAS_LARGE_RESULT + "=true"); + System.err + .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use"); + System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>"); + System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter"); + System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the " + + CF_RENAME_PROP + " property. Futher, filters will only use the" + + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify " + + " whether the current row needs to be ignored completely for processing and " + + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;" + + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including" + + " the KeyValue."); + System.err.println("To import data exported from HBase 0.94, use"); + System.err.println(" -Dhbase.import.version=0.94"); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the import"); + System.err.println("For performance consider the following options:\n" + + " -Dmapreduce.map.speculative=false\n" + + " -Dmapreduce.reduce.speculative=false\n" + + " -D" + WAL_DURABILITY + "=<Used while writing data to hbase." + +" Allowed values are the supported durability values" + +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>"); + } + + /** + * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we + * need to flush all the regions of the table as the data is held in memory and is also not + * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the + * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL} + */ + public static void flushRegionsIfNecessary(Configuration conf) throws IOException, + InterruptedException { + String tableName = conf.get(TABLE_NAME); + Admin hAdmin = null; + Connection connection = null; + String durability = conf.get(WAL_DURABILITY); + // Need to flush if the data is written to hbase and skip wal is enabled. + if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null + && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) { + LOG.info("Flushing all data that skipped the WAL."); + try { + connection = ConnectionFactory.createConnection(conf); + hAdmin = connection.getAdmin(); + hAdmin.flush(TableName.valueOf(tableName)); + } finally { + if (hAdmin != null) { + hAdmin.close(); + } + if (connection != null) { + connection.close(); + } + } + } + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage("Wrong number of arguments: " + args.length); + return -1; + } + String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER); + if (inputVersionString != null) { + getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString); + } + Job job = createSubmittableJob(getConf(), args); + boolean isJobSuccessful = job.waitForCompletion(true); + if(isJobSuccessful){ + // Flush all the regions of the table + flushRegionsIfNecessary(getConf()); + } + long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue(); + long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(); + if (outputRecords < inputRecords) { + System.err.println("Warning, not all records were imported (maybe filtered out)."); + if (outputRecords == 0) { + System.err.println("If the data was exported from HBase 0.94 "+ + "consider using -Dhbase.import.version=0.94."); + } + } + + return (isJobSuccessful ? 0 : 1); + } + + /** + * Main entry point. + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args); + System.exit(errCode); + } + +}