http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java new file mode 100644 index 0000000..9cccf8c --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -0,0 +1,386 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Tool used to copy a table to another one which can be on a different setup. + * It is also configurable with a start and time as well as a specification + * of the region server implementation if different from the local cluster. + */ [email protected] +public class CopyTable extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(CopyTable.class); + + final static String NAME = "copytable"; + long startTime = 0; + long endTime = HConstants.LATEST_TIMESTAMP; + int batch = Integer.MAX_VALUE; + int cacheRow = -1; + int versions = -1; + String tableName = null; + String startRow = null; + String stopRow = null; + String dstTableName = null; + String peerAddress = null; + String families = null; + boolean allCells = false; + static boolean shuffle = false; + + boolean bulkload = false; + Path bulkloadDir = null; + + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + + /** + * Sets up the actual job. + * + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public Job createSubmittableJob(String[] args) + throws IOException { + if (!doCommandLine(args)) { + return null; + } + + Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); + job.setJarByClass(CopyTable.class); + Scan scan = new Scan(); + + scan.setBatch(batch); + scan.setCacheBlocks(false); + + if (cacheRow > 0) { + scan.setCaching(cacheRow); + } else { + scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100)); + } + + scan.setTimeRange(startTime, endTime); + + if (allCells) { + scan.setRaw(true); + } + if (shuffle) { + job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); + } + if (versions >= 0) { + scan.setMaxVersions(versions); + } + + if (startRow != null) { + scan.setStartRow(Bytes.toBytesBinary(startRow)); + } + + if (stopRow != null) { + scan.setStopRow(Bytes.toBytesBinary(stopRow)); + } + + if(families != null) { + String[] fams = families.split(","); + Map<String,String> cfRenameMap = new HashMap<>(); + for(String fam : fams) { + String sourceCf; + if(fam.contains(":")) { + // fam looks like "sourceCfName:destCfName" + String[] srcAndDest = fam.split(":", 2); + sourceCf = srcAndDest[0]; + String destCf = srcAndDest[1]; + cfRenameMap.put(sourceCf, destCf); + } else { + // fam is just "sourceCf" + sourceCf = fam; + } + scan.addFamily(Bytes.toBytes(sourceCf)); + } + Import.configureCfRenaming(job.getConfiguration(), cfRenameMap); + } + job.setNumReduceTasks(0); + + if (bulkload) { + TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null, + null, job); + + // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. + TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); + + FileSystem fs = FileSystem.get(getConf()); + Random rand = new Random(); + Path root = new Path(fs.getWorkingDirectory(), "copytable"); + fs.mkdirs(root); + while (true) { + bulkloadDir = new Path(root, "" + rand.nextLong()); + if (!fs.exists(bulkloadDir)) { + break; + } + } + + System.out.println("HFiles will be stored at " + this.bulkloadDir); + HFileOutputFormat2.setOutputPath(job, bulkloadDir); + try (Connection conn = ConnectionFactory.createConnection(getConf()); + Admin admin = conn.getAdmin()) { + HFileOutputFormat2.configureIncrementalLoadMap(job, + admin.listTableDescriptor((TableName.valueOf(dstTableName)))); + } + } else { + TableMapReduceUtil.initTableMapperJob(tableName, scan, + Import.Importer.class, null, null, job); + + TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, + null); + } + + return job; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void printUsage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " + + "[--new.name=NEW] [--peer.adr=ADR] <tablename>"); + System.err.println(); + System.err.println("Options:"); + System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); + System.err.println(" specify if different from current cluster"); + System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster"); + System.err.println(" startrow the start row"); + System.err.println(" stoprow the stop row"); + System.err.println(" starttime beginning of the time range (unixtime in millis)"); + System.err.println(" without endtime means from starttime to forever"); + System.err.println(" endtime end of the time range. Ignored if no starttime specified."); + System.err.println(" versions number of cell versions to copy"); + System.err.println(" new.name new table's name"); + System.err.println(" peer.adr Address of the peer cluster given in the format"); + System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client" + + ".port:zookeeper.znode.parent"); + System.err.println(" families comma-separated list of families to copy"); + System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); + System.err.println(" To keep the same name, just give \"cfName\""); + System.err.println(" all.cells also copy delete markers and deleted cells"); + System.err.println(" bulkload Write input into HFiles and bulk load to the destination " + + "table"); + System.err.println(); + System.err.println("Args:"); + System.err.println(" tablename Name of the table to copy"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:"); + System.err.println(" $ hbase " + + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " + + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); + System.err.println("For performance consider the following general option:\n" + + " It is recommended that you set the following to >=100. A higher value uses more memory but\n" + + " decreases the round trip time to the server and may increase performance.\n" + + " -Dhbase.client.scanner.caching=100\n" + + " The following should always be set to false, to prevent writing data twice, which may produce \n" + + " inaccurate results.\n" + + " -Dmapreduce.map.speculative=false"); + } + + private boolean doCommandLine(final String[] args) { + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + if (args.length < 1) { + printUsage(null); + return false; + } + try { + for (int i = 0; i < args.length; i++) { + String cmd = args[i]; + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(null); + return false; + } + + final String startRowArgKey = "--startrow="; + if (cmd.startsWith(startRowArgKey)) { + startRow = cmd.substring(startRowArgKey.length()); + continue; + } + + final String stopRowArgKey = "--stoprow="; + if (cmd.startsWith(stopRowArgKey)) { + stopRow = cmd.substring(stopRowArgKey.length()); + continue; + } + + final String startTimeArgKey = "--starttime="; + if (cmd.startsWith(startTimeArgKey)) { + startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); + continue; + } + + final String endTimeArgKey = "--endtime="; + if (cmd.startsWith(endTimeArgKey)) { + endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); + continue; + } + + final String batchArgKey = "--batch="; + if (cmd.startsWith(batchArgKey)) { + batch = Integer.parseInt(cmd.substring(batchArgKey.length())); + continue; + } + + final String cacheRowArgKey = "--cacheRow="; + if (cmd.startsWith(cacheRowArgKey)) { + cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); + continue; + } + + final String versionsArgKey = "--versions="; + if (cmd.startsWith(versionsArgKey)) { + versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); + continue; + } + + final String newNameArgKey = "--new.name="; + if (cmd.startsWith(newNameArgKey)) { + dstTableName = cmd.substring(newNameArgKey.length()); + continue; + } + + final String peerAdrArgKey = "--peer.adr="; + if (cmd.startsWith(peerAdrArgKey)) { + peerAddress = cmd.substring(peerAdrArgKey.length()); + continue; + } + + final String familiesArgKey = "--families="; + if (cmd.startsWith(familiesArgKey)) { + families = cmd.substring(familiesArgKey.length()); + continue; + } + + if (cmd.startsWith("--all.cells")) { + allCells = true; + continue; + } + + if (cmd.startsWith("--bulkload")) { + bulkload = true; + continue; + } + + if (cmd.startsWith("--shuffle")) { + shuffle = true; + continue; + } + + if (i == args.length-1) { + tableName = cmd; + } else { + printUsage("Invalid argument '" + cmd + "'"); + return false; + } + } + if (dstTableName == null && peerAddress == null) { + printUsage("At least a new table name or a " + + "peer address must be specified"); + return false; + } + if ((endTime != 0) && (startTime > endTime)) { + printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); + return false; + } + + if (bulkload && peerAddress != null) { + printUsage("Remote bulkload is not supported!"); + return false; + } + + // set dstTableName if necessary + if (dstTableName == null) { + dstTableName = tableName; + } + } catch (Exception e) { + e.printStackTrace(); + printUsage("Can't start because " + e.getMessage()); + return false; + } + return true; + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + Job job = createSubmittableJob(args); + if (job == null) return 1; + if (!job.waitForCompletion(true)) { + LOG.info("Map-reduce job failed!"); + if (bulkload) { + LOG.info("Files are not bulkloaded!"); + } + return 1; + } + int code = 0; + if (bulkload) { + code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(), + this.dstTableName}); + if (code == 0) { + // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun + // LoadIncrementalHFiles. + FileSystem fs = FileSystem.get(this.getConf()); + if (!fs.delete(this.bulkloadDir, true)) { + LOG.error("Deleting folder " + bulkloadDir + " failed!"); + code = 1; + } + } + } + return code; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java new file mode 100644 index 0000000..004ee5c --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY; +import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME; +import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.visibility.Authorizations; +import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; +import org.apache.hadoop.hbase.security.visibility.VisibilityLabelOrdinalProvider; +import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This implementation creates tags by expanding expression using label ordinal. Labels will be + * serialized in sorted order of it's ordinal. + */ [email protected] +public class DefaultVisibilityExpressionResolver implements VisibilityExpressionResolver { + private static final Log LOG = LogFactory.getLog(DefaultVisibilityExpressionResolver.class); + + private Configuration conf; + private final Map<String, Integer> labels = new HashMap<>(); + + @Override + public Configuration getConf() { + return this.conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public void init() { + // Reading all the labels and ordinal. + // This scan should be done by user with global_admin privileges.. Ensure that it works + Table labelsTable = null; + Connection connection = null; + try { + connection = ConnectionFactory.createConnection(conf); + try { + labelsTable = connection.getTable(LABELS_TABLE_NAME); + } catch (IOException e) { + LOG.error("Error opening 'labels' table", e); + return; + } + Scan scan = new Scan(); + scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL)); + scan.addColumn(LABELS_TABLE_FAMILY, LABEL_QUALIFIER); + ResultScanner scanner = null; + try { + scanner = labelsTable.getScanner(scan); + Result next = null; + while ((next = scanner.next()) != null) { + byte[] row = next.getRow(); + byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER); + labels.put(Bytes.toString(value), Bytes.toInt(row)); + } + } catch (TableNotFoundException e) { + // Table not found. So just return + return; + } catch (IOException e) { + LOG.error("Error scanning 'labels' table", e); + } finally { + if (scanner != null) scanner.close(); + } + } catch (IOException ioe) { + LOG.error("Failed reading 'labels' tags", ioe); + return; + } finally { + if (labelsTable != null) { + try { + labelsTable.close(); + } catch (IOException ioe) { + LOG.warn("Error closing 'labels' table", ioe); + } + } + if (connection != null) + try { + connection.close(); + } catch (IOException ioe) { + LOG.warn("Failed close of temporary connection", ioe); + } + } + } + + @Override + public List<Tag> createVisibilityExpTags(String visExpression) throws IOException { + VisibilityLabelOrdinalProvider provider = new VisibilityLabelOrdinalProvider() { + @Override + public int getLabelOrdinal(String label) { + Integer ordinal = null; + ordinal = labels.get(label); + if (ordinal != null) { + return ordinal.intValue(); + } + return VisibilityConstants.NON_EXIST_LABEL_ORDINAL; + } + + @Override + public String getLabel(int ordinal) { + // Unused + throw new UnsupportedOperationException( + "getLabel should not be used in VisibilityExpressionResolver"); + } + }; + return VisibilityUtils.createVisibilityExpTags(visExpression, true, false, null, provider); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java new file mode 100644 index 0000000..9737b55 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; +import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.util.ProgramDriver; + +/** + * Driver for hbase mapreduce jobs. Select which to run by passing + * name of job to this main. + */ [email protected](HBaseInterfaceAudience.TOOLS) [email protected] +public class Driver { + /** + * @param args + * @throws Throwable + */ + public static void main(String[] args) throws Throwable { + ProgramDriver pgd = new ProgramDriver(); + + pgd.addClass(RowCounter.NAME, RowCounter.class, + "Count rows in HBase table."); + pgd.addClass(CellCounter.NAME, CellCounter.class, + "Count cells in HBase table."); + pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS."); + pgd.addClass(Import.NAME, Import.class, "Import data written by Export."); + pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format."); + pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class, + "Complete a bulk data load."); + pgd.addClass(CopyTable.NAME, CopyTable.class, + "Export a table from local cluster to peer cluster."); + pgd.addClass(VerifyReplication.NAME, VerifyReplication.class, "Compare" + + " the data from tables in two different clusters. WARNING: It" + + " doesn't work for incrementColumnValues'd cells since the" + + " timestamp is changed after being appended to the log."); + pgd.addClass(WALPlayer.NAME, WALPlayer.class, "Replay WAL files."); + pgd.addClass(ExportSnapshot.NAME, ExportSnapshot.class, "Export" + + " the specific snapshot to a given FileSystem."); + + ProgramDriver.class.getMethod("driver", new Class [] {String[].class}). + invoke(pgd, new Object[]{args}); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java new file mode 100644 index 0000000..de6cf3a --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java @@ -0,0 +1,197 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.IncompatibleFilterException; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Export an HBase table. + * Writes content to sequence files up in HDFS. Use {@link Import} to read it + * back in again. + */ [email protected] +public class Export extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(Export.class); + final static String NAME = "export"; + final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows"; + final static String EXPORT_BATCHING = "hbase.export.scanner.batch"; + + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + Path outputDir = new Path(args[1]); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); + job.setJobName(NAME + "_" + tableName); + job.setJarByClass(Export.class); + // Set optional scan parameters + Scan s = getConfiguredScanForJob(conf, args); + IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job); + // No reducers. Just write straight to output files. + job.setNumReduceTasks(0); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Result.class); + FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs. + return job; + } + + private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException { + Scan s = new Scan(); + // Optional arguments. + // Set Scan Versions + int versions = args.length > 2? Integer.parseInt(args[2]): 1; + s.setMaxVersions(versions); + // Set Scan Range + long startTime = args.length > 3? Long.parseLong(args[3]): 0L; + long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE; + s.setTimeRange(startTime, endTime); + // Set cache blocks + s.setCacheBlocks(false); + // set Start and Stop row + if (conf.get(TableInputFormat.SCAN_ROW_START) != null) { + s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START))); + } + if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) { + s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP))); + } + // Set Scan Column Family + boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN)); + if (raw) { + s.setRaw(raw); + } + for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) { + s.addFamily(Bytes.toBytes(columnFamily)); + } + // Set RowFilter or Prefix Filter if applicable. + Filter exportFilter = getExportFilter(args); + if (exportFilter!= null) { + LOG.info("Setting Scan Filter for Export."); + s.setFilter(exportFilter); + } + + int batching = conf.getInt(EXPORT_BATCHING, -1); + if (batching != -1){ + try { + s.setBatch(batching); + } catch (IncompatibleFilterException e) { + LOG.error("Batching could not be set", e); + } + } + LOG.info("versions=" + versions + ", starttime=" + startTime + + ", endtime=" + endTime + ", keepDeletedCells=" + raw); + return s; + } + + private static Filter getExportFilter(String[] args) { + Filter exportFilter = null; + String filterCriteria = (args.length > 5) ? args[5]: null; + if (filterCriteria == null) return null; + if (filterCriteria.startsWith("^")) { + String regexPattern = filterCriteria.substring(1, filterCriteria.length()); + exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern)); + } else { + exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria)); + } + return exportFilter; + } + + /* + * @param errorMsg Error message. Can be null. + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " + + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n"); + System.err.println(" Note: -D properties will be applied to the conf used. "); + System.err.println(" For example: "); + System.err.println(" -D mapreduce.output.fileoutputformat.compress=true"); + System.err.println(" -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec"); + System.err.println(" -D mapreduce.output.fileoutputformat.compress.type=BLOCK"); + System.err.println(" Additionally, the following SCAN properties can be specified"); + System.err.println(" to control/limit what is exported.."); + System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ..."); + System.err.println(" -D " + RAW_SCAN + "=true"); + System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>"); + System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>"); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the export"); + System.err.println("For performance consider the following properties:\n" + + " -Dhbase.client.scanner.caching=100\n" + + " -Dmapreduce.map.speculative=false\n" + + " -Dmapreduce.reduce.speculative=false"); + System.err.println("For tables with very wide rows consider setting the batch size as below:\n" + + " -D" + EXPORT_BATCHING + "=10"); + } + + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage("Wrong number of arguments: " + args.length); + return -1; + } + Job job = createSubmittableJob(getConf(), args); + return (job.waitForCompletion(true) ? 0 : 1); + } + + /** + * Main entry point. + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int errCode = ToolRunner.run(HBaseConfiguration.create(), new Export(), args); + System.exit(errCode); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java new file mode 100644 index 0000000..dc30c6e --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java @@ -0,0 +1,177 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; + +/** + * Extract grouping columns from input record. + */ [email protected] +public class GroupingTableMapper +extends TableMapper<ImmutableBytesWritable,Result> implements Configurable { + + /** + * JobConf parameter to specify the columns used to produce the key passed to + * collect from the map phase. + */ + public static final String GROUP_COLUMNS = + "hbase.mapred.groupingtablemap.columns"; + + /** The grouping columns. */ + protected byte [][] columns; + /** The current configuration. */ + private Configuration conf = null; + + /** + * Use this before submitting a TableMap job. It will appropriately set up + * the job. + * + * @param table The table to be processed. + * @param scan The scan with the columns etc. + * @param groupColumns A space separated list of columns used to form the + * key used in collect. + * @param mapper The mapper class. + * @param job The current job. + * @throws IOException When setting up the job fails. + */ + @SuppressWarnings("unchecked") + public static void initJob(String table, Scan scan, String groupColumns, + Class<? extends TableMapper> mapper, Job job) throws IOException { + TableMapReduceUtil.initTableMapperJob(table, scan, mapper, + ImmutableBytesWritable.class, Result.class, job); + job.getConfiguration().set(GROUP_COLUMNS, groupColumns); + } + + /** + * Extract the grouping columns from value to construct a new key. Pass the + * new key and value to reduce. If any of the grouping columns are not found + * in the value, the record is skipped. + * + * @param key The current key. + * @param value The current value. + * @param context The current context. + * @throws IOException When writing the record fails. + * @throws InterruptedException When the job is aborted. + */ + @Override + public void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + byte[][] keyVals = extractKeyValues(value); + if(keyVals != null) { + ImmutableBytesWritable tKey = createGroupKey(keyVals); + context.write(tKey, value); + } + } + + /** + * Extract columns values from the current record. This method returns + * null if any of the columns are not found. + * <p> + * Override this method if you want to deal with nulls differently. + * + * @param r The current values. + * @return Array of byte values. + */ + protected byte[][] extractKeyValues(Result r) { + byte[][] keyVals = null; + ArrayList<byte[]> foundList = new ArrayList<>(); + int numCols = columns.length; + if (numCols > 0) { + for (Cell value: r.listCells()) { + byte [] column = KeyValue.makeColumn(CellUtil.cloneFamily(value), + CellUtil.cloneQualifier(value)); + for (int i = 0; i < numCols; i++) { + if (Bytes.equals(column, columns[i])) { + foundList.add(CellUtil.cloneValue(value)); + break; + } + } + } + if(foundList.size() == numCols) { + keyVals = foundList.toArray(new byte[numCols][]); + } + } + return keyVals; + } + + /** + * Create a key by concatenating multiple column values. + * <p> + * Override this function in order to produce different types of keys. + * + * @param vals The current key/values. + * @return A key generated by concatenating multiple column values. + */ + protected ImmutableBytesWritable createGroupKey(byte[][] vals) { + if(vals == null) { + return null; + } + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < vals.length; i++) { + if(i > 0) { + sb.append(" "); + } + sb.append(Bytes.toString(vals[i])); + } + return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString())); + } + + /** + * Returns the current configuration. + * + * @return The current configuration. + * @see org.apache.hadoop.conf.Configurable#getConf() + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Sets the configuration. This is used to set up the grouping details. + * + * @param configuration The configuration to set. + * @see org.apache.hadoop.conf.Configurable#setConf( + * org.apache.hadoop.conf.Configuration) + */ + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + String[] cols = conf.get(GROUP_COLUMNS, "").split(" "); + columns = new byte[cols.length][]; + for(int i = 0; i < cols.length; i++) { + columns[i] = Bytes.toBytes(cols[i]); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java new file mode 100644 index 0000000..e90d5c1 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple MR input format for HFiles. + * This code was borrowed from Apache Crunch project. + * Updated to the recent version of HBase. + */ +public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> { + + private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class); + + /** + * File filter that removes all "hidden" files. This might be something worth removing from + * a more general purpose utility; it accounts for the presence of metadata files created + * in the way we're doing exports. + */ + static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + /** + * Record reader for HFiles. + */ + private static class HFileRecordReader extends RecordReader<NullWritable, Cell> { + + private Reader in; + protected Configuration conf; + private HFileScanner scanner; + + /** + * A private cache of the key value so it doesn't need to be loaded twice from the scanner. + */ + private Cell value = null; + private long count; + private boolean seeked = false; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) split; + conf = context.getConfiguration(); + Path path = fileSplit.getPath(); + FileSystem fs = path.getFileSystem(conf); + LOG.info("Initialize HFileRecordReader for {}", path); + this.in = HFile.createReader(fs, path, conf); + + // The file info must be loaded before the scanner can be used. + // This seems like a bug in HBase, but it's easily worked around. + this.in.loadFileInfo(); + this.scanner = in.getScanner(false, false); + + } + + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + boolean hasNext; + if (!seeked) { + LOG.info("Seeking to start"); + hasNext = scanner.seekTo(); + seeked = true; + } else { + hasNext = scanner.next(); + } + if (!hasNext) { + return false; + } + value = scanner.getCell(); + count++; + return true; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public Cell getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to + // the start row, but better than nothing anyway. + return 1.0f * count / in.getEntries(); + } + + @Override + public void close() throws IOException { + if (in != null) { + in.close(); + in = null; + } + } + } + + @Override + protected List<FileStatus> listStatus(JobContext job) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + + // Explode out directories that match the original FileInputFormat filters + // since HFiles are written to directories where the + // directory name is the column name + for (FileStatus status : super.listStatus(job)) { + if (status.isDirectory()) { + FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); + for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { + result.add(match); + } + } else { + result.add(status); + } + } + return result; + } + + @Override + public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new HFileRecordReader(); + } + + @Override + protected boolean isSplitable(JobContext context, Path filename) { + // This file isn't splittable. + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java new file mode 100644 index 0000000..7fea254 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -0,0 +1,902 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetSocketAddress; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +/** + * Writes HFiles. Passed Cells must arrive in order. + * Writes current time as the sequence id for the file. Sets the major compacted + * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll + * all HFiles being written. + * <p> + * Using this class as part of a MapReduce job is best done + * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}. + */ [email protected] +public class HFileOutputFormat2 + extends FileOutputFormat<ImmutableBytesWritable, Cell> { + private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class); + static class TableInfo { + private TableDescriptor tableDesctiptor; + private RegionLocator regionLocator; + + public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) { + this.tableDesctiptor = tableDesctiptor; + this.regionLocator = regionLocator; + } + + /** + * The modification for the returned HTD doesn't affect the inner TD. + * @return A clone of inner table descriptor + * @deprecated use {@link #getTableDescriptor} + */ + @Deprecated + public HTableDescriptor getHTableDescriptor() { + return new HTableDescriptor(tableDesctiptor); + } + + public TableDescriptor getTableDescriptor() { + return tableDesctiptor; + } + + public RegionLocator getRegionLocator() { + return regionLocator; + } + } + + protected static final byte[] tableSeparator = ";".getBytes(StandardCharsets.UTF_8); + + protected static byte[] combineTableNameSuffix(byte[] tableName, + byte[] suffix ) { + return Bytes.add(tableName, tableSeparator, suffix); + } + + // The following constants are private since these are used by + // HFileOutputFormat2 to internally transfer data between job setup and + // reducer run using conf. + // These should not be changed by the client. + static final String COMPRESSION_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.compression"; + static final String BLOOM_TYPE_FAMILIES_CONF_KEY = + "hbase.hfileoutputformat.families.bloomtype"; + static final String BLOCK_SIZE_FAMILIES_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.blocksize"; + static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; + + // This constant is public since the client can modify this when setting + // up their conf object and thus refer to this symbol. + // It is present for backwards compatibility reasons. Use it only to + // override the auto-detection of datablock encoding. + public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.datablock.encoding"; + + /** + * Keep locality while generating HFiles for bulkload. See HBASE-12596 + */ + public static final String LOCALITY_SENSITIVE_CONF_KEY = + "hbase.bulkload.locality.sensitive.enabled"; + private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; + static final String OUTPUT_TABLE_NAME_CONF_KEY = + "hbase.mapreduce.hfileoutputformat.table.name"; + static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = + "hbase.mapreduce.use.multi.table.hfileoutputformat"; + + public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy"; + public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + "."; + + @Override + public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter( + final TaskAttemptContext context) throws IOException, InterruptedException { + return createRecordWriter(context); + } + + protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) { + return combineTableNameSuffix(tableName, family); + } + + static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> + createRecordWriter(final TaskAttemptContext context) + throws IOException { + + // Get the path of the temporary output file + final Path outputPath = FileOutputFormat.getOutputPath(context); + final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath(); + final Configuration conf = context.getConfiguration(); + final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ; + final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY); + if (writeTableNames==null || writeTableNames.isEmpty()) { + throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY + + " cannot be empty"); + } + final FileSystem fs = outputDir.getFileSystem(conf); + // These configs. are from hbase-*.xml + final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, + HConstants.DEFAULT_MAX_FILE_SIZE); + // Invented config. Add to hbase-*.xml if other than default compression. + final String defaultCompressionStr = conf.get("hfile.compression", + Compression.Algorithm.NONE.getName()); + final Algorithm defaultCompression = HFileWriterImpl + .compressionByName(defaultCompressionStr); + final boolean compactionExclude = conf.getBoolean( + "hbase.mapreduce.hfileoutputformat.compaction.exclude", false); + + final Set<String> allTableNames = Arrays.stream(writeTableNames.split( + Bytes.toString(tableSeparator))).collect(Collectors.toSet()); + + // create a map from column family to the compression algorithm + final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); + final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); + final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); + + String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); + final Map<byte[], DataBlockEncoding> datablockEncodingMap + = createFamilyDataBlockEncodingMap(conf); + final DataBlockEncoding overriddenEncoding; + if (dataBlockEncodingStr != null) { + overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); + } else { + overriddenEncoding = null; + } + + return new RecordWriter<ImmutableBytesWritable, V>() { + // Map of families to writers and how much has been output on the writer. + private final Map<byte[], WriterLength> writers = + new TreeMap<>(Bytes.BYTES_COMPARATOR); + private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; + private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime()); + private boolean rollRequested = false; + + @Override + public void write(ImmutableBytesWritable row, V cell) + throws IOException { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + + // null input == user explicitly wants to flush + if (row == null && kv == null) { + rollWriters(); + return; + } + + byte[] rowKey = CellUtil.cloneRow(kv); + long length = kv.getLength(); + byte[] family = CellUtil.cloneFamily(kv); + byte[] tableNameBytes = null; + if (writeMultipleTables) { + tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); + if (!allTableNames.contains(Bytes.toString(tableNameBytes))) { + throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) + + "' not" + " expected"); + } + } else { + tableNameBytes = writeTableNames.getBytes(StandardCharsets.UTF_8); + } + byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); + WriterLength wl = this.writers.get(tableAndFamily); + + // If this is a new column family, verify that the directory exists + if (wl == null) { + Path writerPath = null; + if (writeMultipleTables) { + writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes + .toString(family))); + } + else { + writerPath = new Path(outputDir, Bytes.toString(family)); + } + fs.mkdirs(writerPath); + configureStoragePolicy(conf, fs, tableAndFamily, writerPath); + } + + // If any of the HFiles for the column families has reached + // maxsize, we need to roll all the writers + if (wl != null && wl.written + length >= maxsize) { + this.rollRequested = true; + } + + // This can only happen once a row is finished though + if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { + rollWriters(); + } + + // create a new WAL writer, if necessary + if (wl == null || wl.writer == null) { + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + HRegionLocation loc = null; + + String tableName = Bytes.toString(tableNameBytes); + if (tableName != null) { + try (Connection connection = ConnectionFactory.createConnection(conf); + RegionLocator locator = + connection.getRegionLocator(TableName.valueOf(tableName))) { + loc = locator.getRegionLocation(rowKey); + } catch (Throwable e) { + LOG.warn("There's something wrong when locating rowkey: " + + Bytes.toString(rowKey) + " for tablename: " + tableName, e); + loc = null; + } } + + if (null == loc) { + if (LOG.isTraceEnabled()) { + LOG.trace("failed to get region location, so use default writer for rowkey: " + + Bytes.toString(rowKey)); + } + wl = getNewWriter(tableNameBytes, family, conf, null); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]"); + } + InetSocketAddress initialIsa = + new InetSocketAddress(loc.getHostname(), loc.getPort()); + if (initialIsa.isUnresolved()) { + if (LOG.isTraceEnabled()) { + LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":" + + loc.getPort() + ", so use default writer"); + } + wl = getNewWriter(tableNameBytes, family, conf, null); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("use favored nodes writer: " + initialIsa.getHostString()); + } + wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa + }); + } + } + } else { + wl = getNewWriter(tableNameBytes, family, conf, null); + } + } + + // we now have the proper WAL writer. full steam ahead + kv.updateLatestStamp(this.now); + wl.writer.append(kv); + wl.written += length; + + // Copy the row so we know when a row transition. + this.previousRow = rowKey; + } + + private void rollWriters() throws IOException { + for (WriterLength wl : this.writers.values()) { + if (wl.writer != null) { + LOG.info( + "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written)); + close(wl.writer); + } + wl.writer = null; + wl.written = 0; + } + this.rollRequested = false; + } + + /* + * Create a new StoreFile.Writer. + * @param family + * @return A WriterLength, containing a new StoreFile.Writer. + * @throws IOException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED", + justification="Not important") + private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration + conf, InetSocketAddress[] favoredNodes) throws IOException { + byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family); + Path familydir = new Path(outputDir, Bytes.toString(family)); + if (writeMultipleTables) { + familydir = new Path(outputDir, + new Path(Bytes.toString(tableName), Bytes.toString(family))); + } + WriterLength wl = new WriterLength(); + Algorithm compression = compressionMap.get(tableAndFamily); + compression = compression == null ? defaultCompression : compression; + BloomType bloomType = bloomTypeMap.get(tableAndFamily); + bloomType = bloomType == null ? BloomType.NONE : bloomType; + Integer blockSize = blockSizeMap.get(tableAndFamily); + blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; + DataBlockEncoding encoding = overriddenEncoding; + encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; + encoding = encoding == null ? DataBlockEncoding.NONE : encoding; + Configuration tempConf = new Configuration(conf); + tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + HFileContextBuilder contextBuilder = new HFileContextBuilder() + .withCompression(compression) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withBlockSize(blockSize); + + if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { + contextBuilder.withIncludesTags(true); + } + + contextBuilder.withDataBlockEncoding(encoding); + HFileContext hFileContext = contextBuilder.build(); + if (null == favoredNodes) { + wl.writer = + new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build(); + } else { + wl.writer = + new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(bloomType) + .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build(); + } + + this.writers.put(tableAndFamily, wl); + return wl; + } + + private void close(final StoreFileWriter w) throws IOException { + if (w != null) { + w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())); + w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(context.getTaskAttemptID().toString())); + w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)); + w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)); + w.appendTrackedTimestampsToMetadata(); + w.close(); + } + } + + @Override + public void close(TaskAttemptContext c) + throws IOException, InterruptedException { + for (WriterLength wl: this.writers.values()) { + close(wl.writer); + } + } + }; + } + + /** + * Configure block storage policy for CF after the directory is created. + */ + static void configureStoragePolicy(final Configuration conf, final FileSystem fs, + byte[] tableAndFamily, Path cfPath) { + if (null == conf || null == fs || null == tableAndFamily || null == cfPath) { + return; + } + + String policy = + conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily), + conf.get(STORAGE_POLICY_PROPERTY)); + FSUtils.setStoragePolicy(fs, cfPath, policy); + } + + /* + * Data structure to hold a Writer and amount of data written on it. + */ + static class WriterLength { + long written = 0; + StoreFileWriter writer = null; + } + + /** + * Return the start keys of all of the regions in this table, + * as a list of ImmutableBytesWritable. + */ + private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators, + boolean writeMultipleTables) + throws IOException { + + ArrayList<ImmutableBytesWritable> ret = new ArrayList<>(); + for(RegionLocator regionLocator : regionLocators) + { + TableName tableName = regionLocator.getName(); + LOG.info("Looking up current regions for table " + tableName); + byte[][] byteKeys = regionLocator.getStartKeys(); + for (byte[] byteKey : byteKeys) { + byte[] fullKey = byteKey; //HFileOutputFormat2 use case + if (writeMultipleTables) + { + //MultiTableHFileOutputFormat use case + fullKey = combineTableNameSuffix(tableName.getName(), byteKey); + } + if (LOG.isDebugEnabled()) { + LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary + (fullKey) + "]"); + } + ret.add(new ImmutableBytesWritable(fullKey)); + } + } + return ret; + } + + /** + * Write out a {@link SequenceFile} that can be read by + * {@link TotalOrderPartitioner} that contains the split points in startKeys. + */ + @SuppressWarnings("deprecation") + private static void writePartitions(Configuration conf, Path partitionsPath, + List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException { + LOG.info("Writing partition information to " + partitionsPath); + if (startKeys.isEmpty()) { + throw new IllegalArgumentException("No regions passed"); + } + + // We're generating a list of split points, and we don't ever + // have keys < the first region (which has an empty start key) + // so we need to remove it. Otherwise we would end up with an + // empty reducer with index 0 + TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys); + ImmutableBytesWritable first = sorted.first(); + if (writeMultipleTables) { + first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first + ().get())); + } + if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { + throw new IllegalArgumentException( + "First region of table should have empty start key. Instead has: " + + Bytes.toStringBinary(first.get())); + } + sorted.remove(sorted.first()); + + // Write the actual file + FileSystem fs = partitionsPath.getFileSystem(conf); + SequenceFile.Writer writer = SequenceFile.createWriter( + fs, conf, partitionsPath, ImmutableBytesWritable.class, + NullWritable.class); + + try { + for (ImmutableBytesWritable startKey : sorted) { + writer.append(startKey, NullWritable.get()); + } + } finally { + writer.close(); + } + } + + /** + * Configure a MapReduce Job to perform an incremental load into the given + * table. This + * <ul> + * <li>Inspects the table to configure a total order partitioner</li> + * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> + * <li>Sets the number of reduce tasks to match the current number of regions</li> + * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> + * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or + * PutSortReducer)</li> + * </ul> + * The user should be sure to set the map output value class to either KeyValue or Put before + * running this function. + */ + public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) + throws IOException { + configureIncrementalLoad(job, table.getDescriptor(), regionLocator); + } + + /** + * Configure a MapReduce Job to perform an incremental load into the given + * table. This + * <ul> + * <li>Inspects the table to configure a total order partitioner</li> + * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> + * <li>Sets the number of reduce tasks to match the current number of regions</li> + * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> + * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or + * PutSortReducer)</li> + * </ul> + * The user should be sure to set the map output value class to either KeyValue or Put before + * running this function. + */ + public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor, + RegionLocator regionLocator) throws IOException { + ArrayList<TableInfo> singleTableInfo = new ArrayList<>(); + singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator)); + configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class); + } + + static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, Class<? extends OutputFormat<?, ?>> cls) throws IOException { + Configuration conf = job.getConfiguration(); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + job.setOutputFormatClass(cls); + + if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) { + throw new IllegalArgumentException("Duplicate entries found in TableInfo argument"); + } + boolean writeMultipleTables = false; + if (MultiTableHFileOutputFormat.class.equals(cls)) { + writeMultipleTables = true; + conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true); + } + // Based on the configured map output class, set the correct reducer to properly + // sort the incoming values. + // TODO it would be nice to pick one or the other of these formats. + if (KeyValue.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(KeyValueSortReducer.class); + } else if (Put.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(PutSortReducer.class); + } else if (Text.class.equals(job.getMapOutputValueClass())) { + job.setReducerClass(TextSortReducer.class); + } else { + LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); + } + + conf.setStrings("io.serializations", conf.get("io.serializations"), + MutationSerialization.class.getName(), ResultSerialization.class.getName(), + KeyValueSerialization.class.getName()); + + if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { + LOG.info("bulkload locality sensitive enabled"); + } + + /* Now get the region start keys for every table required */ + List<String> allTableNames = new ArrayList<>(multiTableInfo.size()); + List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size()); + List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size()); + + for( TableInfo tableInfo : multiTableInfo ) + { + regionLocators.add(tableInfo.getRegionLocator()); + allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString()); + tableDescriptors.add(tableInfo.getTableDescriptor()); + } + // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table + conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes + .toString(tableSeparator))); + List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables); + // Use table's region boundaries for TOP split points. + LOG.info("Configuring " + startKeys.size() + " reduce partitions " + + "to match current region count for all tables"); + job.setNumReduceTasks(startKeys.size()); + + configurePartitioner(job, startKeys, writeMultipleTables); + // Set compression algorithms based on column families + + conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails, + tableDescriptors)); + conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails, + tableDescriptors)); + conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails, + tableDescriptors)); + conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors)); + + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); + } + + public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws + IOException { + Configuration conf = job.getConfiguration(); + + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + job.setOutputFormatClass(HFileOutputFormat2.class); + + ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1); + singleTableDescriptor.add(tableDescriptor); + + conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString()); + // Set compression algorithms based on column families + conf.set(COMPRESSION_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor)); + conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor)); + conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor)); + conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, + serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor)); + + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.initCredentials(job); + LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured."); + } + + /** + * Runs inside the task to deserialize column family to compression algorithm + * map from the configuration. + * + * @param conf to read the serialized values from + * @return a map from column family to the configured compression algorithm + */ + @VisibleForTesting + static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration + conf) { + Map<byte[], String> stringMap = createFamilyConfValueMap(conf, + COMPRESSION_FAMILIES_CONF_KEY); + Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (Map.Entry<byte[], String> e : stringMap.entrySet()) { + Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue()); + compressionMap.put(e.getKey(), algorithm); + } + return compressionMap; + } + + /** + * Runs inside the task to deserialize column family to bloom filter type + * map from the configuration. + * + * @param conf to read the serialized values from + * @return a map from column family to the the configured bloom filter type + */ + @VisibleForTesting + static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { + Map<byte[], String> stringMap = createFamilyConfValueMap(conf, + BLOOM_TYPE_FAMILIES_CONF_KEY); + Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (Map.Entry<byte[], String> e : stringMap.entrySet()) { + BloomType bloomType = BloomType.valueOf(e.getValue()); + bloomTypeMap.put(e.getKey(), bloomType); + } + return bloomTypeMap; + } + + /** + * Runs inside the task to deserialize column family to block size + * map from the configuration. + * + * @param conf to read the serialized values from + * @return a map from column family to the configured block size + */ + @VisibleForTesting + static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { + Map<byte[], String> stringMap = createFamilyConfValueMap(conf, + BLOCK_SIZE_FAMILIES_CONF_KEY); + Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (Map.Entry<byte[], String> e : stringMap.entrySet()) { + Integer blockSize = Integer.parseInt(e.getValue()); + blockSizeMap.put(e.getKey(), blockSize); + } + return blockSizeMap; + } + + /** + * Runs inside the task to deserialize column family to data block encoding + * type map from the configuration. + * + * @param conf to read the serialized values from + * @return a map from column family to HFileDataBlockEncoder for the + * configured data block type for the family + */ + @VisibleForTesting + static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap( + Configuration conf) { + Map<byte[], String> stringMap = createFamilyConfValueMap(conf, + DATABLOCK_ENCODING_FAMILIES_CONF_KEY); + Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (Map.Entry<byte[], String> e : stringMap.entrySet()) { + encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); + } + return encoderMap; + } + + + /** + * Run inside the task to deserialize column family to given conf value map. + * + * @param conf to read the serialized values from + * @param confName conf key to read from the configuration + * @return a map of column family to the given configuration value + */ + private static Map<byte[], String> createFamilyConfValueMap( + Configuration conf, String confName) { + Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + String confVal = conf.get(confName, ""); + for (String familyConf : confVal.split("&")) { + String[] familySplit = familyConf.split("="); + if (familySplit.length != 2) { + continue; + } + try { + confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8), + URLDecoder.decode(familySplit[1], "UTF-8")); + } catch (UnsupportedEncodingException e) { + // will not happen with UTF-8 encoding + throw new AssertionError(e); + } + } + return confValMap; + } + + /** + * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against + * <code>splitPoints</code>. Cleans up the partitions file after job exists. + */ + static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean + writeMultipleTables) + throws IOException { + Configuration conf = job.getConfiguration(); + // create the partitions file + FileSystem fs = FileSystem.get(conf); + String hbaseTmpFsDir = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, + HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); + fs.makeQualified(partitionsPath); + writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); + fs.deleteOnExit(partitionsPath); + + // configure job to use it + job.setPartitionerClass(TotalOrderPartitioner.class); + TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") + @VisibleForTesting + static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables) + throws UnsupportedEncodingException { + StringBuilder attributeValue = new StringBuilder(); + int i = 0; + for (TableDescriptor tableDescriptor : allTables) { + if (tableDescriptor == null) { + // could happen with mock table instance + // CODEREVIEW: Can I set an empty string in conf if mock table instance? + return ""; + } + for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) { + if (i++ > 0) { + attributeValue.append('&'); + } + attributeValue.append(URLEncoder.encode( + Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())), + "UTF-8")); + attributeValue.append('='); + attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8")); + } + } + // Get rid of the last ampersand + return attributeValue.toString(); + } + + /** + * Serialize column family to compression algorithm map to configuration. + * Invoked while configuring the MR job for incremental load. + * + * @param tableDescriptor to read the properties from + * @param conf to persist serialized values into + * @throws IOException + * on failure to read column family descriptors + */ + @VisibleForTesting + static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor -> + familyDescriptor.getCompressionType().getName(); + + /** + * Serialize column family to block size map to configuration. Invoked while + * configuring the MR job for incremental load. + * + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into + * + * @throws IOException + * on failure to read column family descriptors + */ + @VisibleForTesting + static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String + .valueOf(familyDescriptor.getBlocksize()); + + /** + * Serialize column family to bloom type map to configuration. Invoked while + * configuring the MR job for incremental load. + * + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into + * + * @throws IOException + * on failure to read column family descriptors + */ + @VisibleForTesting + static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> { + String bloomType = familyDescriptor.getBloomFilterType().toString(); + if (bloomType == null) { + bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name(); + } + return bloomType; + }; + + /** + * Serialize column family to data block encoding map to configuration. + * Invoked while configuring the MR job for incremental load. + * + * @param tableDescriptor + * to read the properties from + * @param conf + * to persist serialized values into + * @throws IOException + * on failure to read column family descriptors + */ + @VisibleForTesting + static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> { + DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); + if (encoding == null) { + encoding = DataBlockEncoding.NONE; + } + return encoding.toString(); + }; + +}
