http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java new file mode 100644 index 0000000..6b5cbe2 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -0,0 +1,915 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.Constructor; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; + +import javax.crypto.spec.SecretKeySpec; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Cipher; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.security.EncryptionUtil; +import org.apache.hadoop.hbase.security.HBaseKerberosUtils; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.AccessControlClient; +import org.apache.hadoop.hbase.security.access.Permission; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL; +import org.apache.hadoop.util.ToolRunner; + +/** + * A command-line utility that reads, writes, and verifies data. Unlike + * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written, + * and supports simultaneously writing and reading the same set of keys. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class LoadTestTool extends AbstractHBaseTool { + + private static final Log LOG = LogFactory.getLog(LoadTestTool.class); + private static final String COLON = ":"; + + /** Table name for the test */ + private TableName tableName; + + /** Column families for the test */ + private byte[][] families; + + /** Table name to use of not overridden on the command line */ + protected static final String DEFAULT_TABLE_NAME = "cluster_test"; + + /** The default data size if not specified */ + protected static final int DEFAULT_DATA_SIZE = 64; + + /** The number of reader/writer threads if not specified */ + protected static final int DEFAULT_NUM_THREADS = 20; + + /** Usage string for the load option */ + protected static final String OPT_USAGE_LOAD = + "<avg_cols_per_key>:<avg_data_size>" + + "[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; + + /** Usage string for the read option */ + protected static final String OPT_USAGE_READ = + "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]"; + + /** Usage string for the update option */ + protected static final String OPT_USAGE_UPDATE = + "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS + + ">][:<#whether to ignore nonce collisions=0>]"; + + protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " + + Arrays.toString(BloomType.values()); + + protected static final String OPT_USAGE_COMPRESSION = "Compression type, " + + "one of " + Arrays.toString(Compression.Algorithm.values()); + + public static final String OPT_BLOOM = "bloom"; + public static final String OPT_COMPRESSION = "compression"; + public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; + public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush."; + + public static final String OPT_INMEMORY = "in_memory"; + public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " + + "inmemory as far as possible. Not guaranteed that reads are always served from inmemory"; + + public static final String OPT_GENERATOR = "generator"; + public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool." + + " Any args for this class can be passed as colon separated after class name"; + + public static final String OPT_WRITER = "writer"; + public static final String OPT_WRITER_USAGE = "The class for executing the write requests"; + + public static final String OPT_UPDATER = "updater"; + public static final String OPT_UPDATER_USAGE = "The class for executing the update requests"; + + public static final String OPT_READER = "reader"; + public static final String OPT_READER_USAGE = "The class for executing the read requests"; + + protected static final String OPT_KEY_WINDOW = "key_window"; + protected static final String OPT_WRITE = "write"; + protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; + public static final String OPT_MULTIPUT = "multiput"; + public static final String OPT_MULTIGET = "multiget_batchsize"; + protected static final String OPT_NUM_KEYS = "num_keys"; + protected static final String OPT_READ = "read"; + protected static final String OPT_START_KEY = "start_key"; + public static final String OPT_TABLE_NAME = "tn"; + public static final String OPT_COLUMN_FAMILIES = "families"; + protected static final String OPT_ZK_QUORUM = "zk"; + protected static final String OPT_ZK_PARENT_NODE = "zk_root"; + protected static final String OPT_SKIP_INIT = "skip_init"; + protected static final String OPT_INIT_ONLY = "init_only"; + protected static final String NUM_TABLES = "num_tables"; + protected static final String OPT_REGIONS_PER_SERVER = "regions_per_server"; + protected static final String OPT_BATCHUPDATE = "batchupdate"; + protected static final String OPT_UPDATE = "update"; + + public static final String OPT_ENCRYPTION = "encryption"; + protected static final String OPT_ENCRYPTION_USAGE = + "Enables transparent encryption on the test table, one of " + + Arrays.toString(Encryption.getSupportedCiphers()); + + public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server"; + protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE + = "Desired number of regions per region server. Defaults to 5."; + public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5; + + public static final String OPT_REGION_REPLICATION = "region_replication"; + protected static final String OPT_REGION_REPLICATION_USAGE = + "Desired number of replicas per region"; + + public static final String OPT_REGION_REPLICA_ID = "region_replica_id"; + protected static final String OPT_REGION_REPLICA_ID_USAGE = + "Region replica id to do the reads from"; + + public static final String OPT_MOB_THRESHOLD = "mob_threshold"; + protected static final String OPT_MOB_THRESHOLD_USAGE = + "Desired cell size to exceed in bytes that will use the MOB write path"; + + protected static final long DEFAULT_START_KEY = 0; + + /** This will be removed as we factor out the dependency on command line */ + protected CommandLine cmd; + + protected MultiThreadedWriter writerThreads = null; + protected MultiThreadedReader readerThreads = null; + protected MultiThreadedUpdater updaterThreads = null; + + protected long startKey, endKey; + + protected boolean isWrite, isRead, isUpdate; + protected boolean deferredLogFlush; + + // Column family options + protected DataBlockEncoding dataBlockEncodingAlgo; + protected Compression.Algorithm compressAlgo; + protected BloomType bloomType; + private boolean inMemoryCF; + + private User userOwner; + // Writer options + protected int numWriterThreads = DEFAULT_NUM_THREADS; + protected int minColsPerKey, maxColsPerKey; + protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE; + protected boolean isMultiPut; + + // Updater options + protected int numUpdaterThreads = DEFAULT_NUM_THREADS; + protected int updatePercent; + protected boolean ignoreConflicts = false; + protected boolean isBatchUpdate; + + // Reader options + private int numReaderThreads = DEFAULT_NUM_THREADS; + private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW; + private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE; + private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS; + private int verifyPercent; + + private int numTables = 1; + + private String superUser; + + private String userNames; + //This file is used to read authentication information in secure clusters. + private String authnFileName; + + private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; + private int regionReplication = -1; // not set + private int regionReplicaId = -1; // not set + + private int mobThreshold = -1; // not set + + // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, + // console tool itself should only be used from console. + protected boolean isSkipInit = false; + protected boolean isInitOnly = false; + + protected Cipher cipher = null; + + protected String[] splitColonSeparated(String option, + int minNumCols, int maxNumCols) { + String optVal = cmd.getOptionValue(option); + String[] cols = optVal.split(COLON); + if (cols.length < minNumCols || cols.length > maxNumCols) { + throw new IllegalArgumentException("Expected at least " + + minNumCols + " columns but no more than " + maxNumCols + + " in the colon-separated value '" + optVal + "' of the " + + "-" + option + " option"); + } + return cols; + } + + protected int getNumThreads(String numThreadsStr) { + return parseInt(numThreadsStr, 1, Short.MAX_VALUE); + } + + public byte[][] getColumnFamilies() { + return families; + } + + /** + * Apply column family options such as Bloom filters, compression, and data + * block encoding. + */ + protected void applyColumnFamilyOptions(TableName tableName, + byte[][] columnFamilies) throws IOException { + try (Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin()) { + TableDescriptor tableDesc = admin.getTableDescriptor(tableName); + LOG.info("Disabling table " + tableName); + admin.disableTable(tableName); + for (byte[] cf : columnFamilies) { + HColumnDescriptor columnDesc = (HColumnDescriptor) tableDesc.getColumnFamily(cf); + boolean isNewCf = columnDesc == null; + if (isNewCf) { + columnDesc = new HColumnDescriptor(cf); + } + if (bloomType != null) { + columnDesc.setBloomFilterType(bloomType); + } + if (compressAlgo != null) { + columnDesc.setCompressionType(compressAlgo); + } + if (dataBlockEncodingAlgo != null) { + columnDesc.setDataBlockEncoding(dataBlockEncodingAlgo); + } + if (inMemoryCF) { + columnDesc.setInMemory(inMemoryCF); + } + if (cipher != null) { + byte[] keyBytes = new byte[cipher.getKeyLength()]; + new SecureRandom().nextBytes(keyBytes); + columnDesc.setEncryptionType(cipher.getName()); + columnDesc.setEncryptionKey( + EncryptionUtil.wrapKey(conf, + User.getCurrent().getShortName(), + new SecretKeySpec(keyBytes, + cipher.getName()))); + } + if (mobThreshold >= 0) { + columnDesc.setMobEnabled(true); + columnDesc.setMobThreshold(mobThreshold); + } + + if (isNewCf) { + admin.addColumnFamily(tableName, columnDesc); + } else { + admin.modifyColumnFamily(tableName, columnDesc); + } + } + LOG.info("Enabling table " + tableName); + admin.enableTable(tableName); + } + } + + @Override + protected void addOptions() { + addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " + + "without port numbers"); + addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper"); + addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write"); + addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma"); + addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD); + addOptWithArg(OPT_READ, OPT_USAGE_READ); + addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE); + addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading"); + addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM); + addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION); + addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE); + addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " + + "to tolerate before terminating all reader threads. The default is " + + MultiThreadedReader.DEFAULT_MAX_ERRORS + "."); + addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " + + "separate gets for every column in a row"); + addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " + + "reads and writes for concurrent write/read workload. The default " + + "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + "."); + + addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " + + "separate puts for every column in a row"); + addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " + + "separate updates for every column in a row"); + addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); + addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE); + addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE); + addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE); + addOptWithArg(OPT_READER, OPT_READER_USAGE); + + addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); + addOptWithArg(OPT_START_KEY, "The first key to read/write " + + "(a 0-based index). The default value is " + + DEFAULT_START_KEY + "."); + addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table " + + "already exists"); + + addOptWithArg(NUM_TABLES, + "A positive integer number. When a number n is speicfied, load test " + + "tool will load n table parallely. -tn parameter value becomes " + + "table name prefix. Each table name is in format <tn>_1...<tn>_n"); + + addOptWithArg(OPT_REGIONS_PER_SERVER, + "A positive integer number. When a number n is specified, load test " + + "tool will create the test table with n regions per server"); + + addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE); + addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE); + addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE); + addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE); + addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE); + addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE); + } + + @Override + protected void processOptions(CommandLine cmd) { + this.cmd = cmd; + + tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME, + DEFAULT_TABLE_NAME)); + + if (cmd.hasOption(OPT_COLUMN_FAMILIES)) { + String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(","); + families = new byte[list.length][]; + for (int i = 0; i < list.length; i++) { + families[i] = Bytes.toBytes(list[i]); + } + } else { + families = HFileTestUtil.DEFAULT_COLUMN_FAMILIES; + } + + isWrite = cmd.hasOption(OPT_WRITE); + isRead = cmd.hasOption(OPT_READ); + isUpdate = cmd.hasOption(OPT_UPDATE); + isInitOnly = cmd.hasOption(OPT_INIT_ONLY); + deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH); + + if (!isWrite && !isRead && !isUpdate && !isInitOnly) { + throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " + + "-" + OPT_UPDATE + " or -" + OPT_READ + " has to be specified"); + } + + if (isInitOnly && (isRead || isWrite || isUpdate)) { + throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with" + + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ); + } + + if (!isInitOnly) { + if (!cmd.hasOption(OPT_NUM_KEYS)) { + throw new IllegalArgumentException(OPT_NUM_KEYS + " must be specified in " + + "read or write mode"); + } + startKey = parseLong(cmd.getOptionValue(OPT_START_KEY, + String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE); + long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1, + Long.MAX_VALUE - startKey); + endKey = startKey + numKeys; + isSkipInit = cmd.hasOption(OPT_SKIP_INIT); + System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]"); + } + + parseColumnFamilyOptions(cmd); + + if (isWrite) { + String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3); + + int colIndex = 0; + minColsPerKey = 1; + maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]); + int avgColDataSize = + parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE); + minColDataSize = avgColDataSize / 2; + maxColDataSize = avgColDataSize * 3 / 2; + + if (colIndex < writeOpts.length) { + numWriterThreads = getNumThreads(writeOpts[colIndex++]); + } + + isMultiPut = cmd.hasOption(OPT_MULTIPUT); + + mobThreshold = -1; + if (cmd.hasOption(OPT_MOB_THRESHOLD)) { + mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD)); + } + + System.out.println("Multi-puts: " + isMultiPut); + System.out.println("Columns per key: " + minColsPerKey + ".." + + maxColsPerKey); + System.out.println("Data size per column: " + minColDataSize + ".." + + maxColDataSize); + } + + if (isUpdate) { + String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3); + int colIndex = 0; + updatePercent = parseInt(mutateOpts[colIndex++], 0, 100); + if (colIndex < mutateOpts.length) { + numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]); + } + if (colIndex < mutateOpts.length) { + ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1; + } + + isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE); + + System.out.println("Batch updates: " + isBatchUpdate); + System.out.println("Percent of keys to update: " + updatePercent); + System.out.println("Updater threads: " + numUpdaterThreads); + System.out.println("Ignore nonce conflicts: " + ignoreConflicts); + } + + if (isRead) { + String[] readOpts = splitColonSeparated(OPT_READ, 1, 2); + int colIndex = 0; + verifyPercent = parseInt(readOpts[colIndex++], 0, 100); + if (colIndex < readOpts.length) { + numReaderThreads = getNumThreads(readOpts[colIndex++]); + } + + if (cmd.hasOption(OPT_MAX_READ_ERRORS)) { + maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS), + 0, Integer.MAX_VALUE); + } + + if (cmd.hasOption(OPT_KEY_WINDOW)) { + keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW), + 0, Integer.MAX_VALUE); + } + + if (cmd.hasOption(OPT_MULTIGET)) { + multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET), + 0, Integer.MAX_VALUE); + } + + System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize); + System.out.println("Percent of keys to verify: " + verifyPercent); + System.out.println("Reader threads: " + numReaderThreads); + } + + numTables = 1; + if (cmd.hasOption(NUM_TABLES)) { + numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE); + } + + numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; + if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) { + numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER)); + } + + regionReplication = 1; + if (cmd.hasOption(OPT_REGION_REPLICATION)) { + regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION)); + } + + regionReplicaId = -1; + if (cmd.hasOption(OPT_REGION_REPLICA_ID)) { + regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID)); + } + } + + private void parseColumnFamilyOptions(CommandLine cmd) { + String dataBlockEncodingStr = cmd.getOptionValue(HFileTestUtil.OPT_DATA_BLOCK_ENCODING); + dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null : + DataBlockEncoding.valueOf(dataBlockEncodingStr); + + String compressStr = cmd.getOptionValue(OPT_COMPRESSION); + compressAlgo = compressStr == null ? Compression.Algorithm.NONE : + Compression.Algorithm.valueOf(compressStr); + + String bloomStr = cmd.getOptionValue(OPT_BLOOM); + bloomType = bloomStr == null ? BloomType.ROW : + BloomType.valueOf(bloomStr); + + inMemoryCF = cmd.hasOption(OPT_INMEMORY); + if (cmd.hasOption(OPT_ENCRYPTION)) { + cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION)); + } + + } + + public void initTestTable() throws IOException { + Durability durability = Durability.USE_DEFAULT; + if (deferredLogFlush) { + durability = Durability.ASYNC_WAL; + } + + HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName, + getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer, + regionReplication, durability); + applyColumnFamilyOptions(tableName, getColumnFamilies()); + } + + @Override + protected int doWork() throws IOException { + if (numTables > 1) { + return parallelLoadTables(); + } else { + return loadTable(); + } + } + + protected int loadTable() throws IOException { + if (cmd.hasOption(OPT_ZK_QUORUM)) { + conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM)); + } + if (cmd.hasOption(OPT_ZK_PARENT_NODE)) { + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE)); + } + + if (isInitOnly) { + LOG.info("Initializing only; no reads or writes"); + initTestTable(); + return 0; + } + + if (!isSkipInit) { + initTestTable(); + } + LoadTestDataGenerator dataGen = null; + if (cmd.hasOption(OPT_GENERATOR)) { + String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON); + dataGen = getLoadGeneratorInstance(clazzAndArgs[0]); + String[] args; + if (dataGen instanceof LoadTestDataGeneratorWithACL) { + LOG.info("Using LoadTestDataGeneratorWithACL"); + if (User.isHBaseSecurityEnabled(conf)) { + LOG.info("Security is enabled"); + authnFileName = clazzAndArgs[1]; + superUser = clazzAndArgs[2]; + userNames = clazzAndArgs[3]; + args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length); + Properties authConfig = new Properties(); + authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName)); + try { + addAuthInfoToConf(authConfig, conf, superUser, userNames); + } catch (IOException exp) { + LOG.error(exp); + return EXIT_FAILURE; + } + userOwner = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, superUser)); + } else { + superUser = clazzAndArgs[1]; + userNames = clazzAndArgs[2]; + args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length); + userOwner = User.createUserForTesting(conf, superUser, new String[0]); + } + } else { + args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1, + clazzAndArgs.length); + } + dataGen.initialize(args); + } else { + // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator + dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize, + minColsPerKey, maxColsPerKey, families); + } + + if (userOwner != null) { + LOG.info("Granting permissions for user " + userOwner.getShortName()); + Permission.Action[] actions = { + Permission.Action.ADMIN, Permission.Action.CREATE, + Permission.Action.READ, Permission.Action.WRITE }; + try { + AccessControlClient.grant(ConnectionFactory.createConnection(conf), + tableName, userOwner.getShortName(), null, null, actions); + } catch (Throwable e) { + LOG.fatal("Error in granting permission for the user " + userOwner.getShortName(), e); + return EXIT_FAILURE; + } + } + + if (userNames != null) { + // This will be comma separated list of expressions. + String users[] = userNames.split(","); + User user = null; + for (String userStr : users) { + if (User.isHBaseSecurityEnabled(conf)) { + user = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, userStr)); + } else { + user = User.createUserForTesting(conf, userStr, new String[0]); + } + } + } + + if (isWrite) { + if (userOwner != null) { + writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner); + } else { + String writerClass = null; + if (cmd.hasOption(OPT_WRITER)) { + writerClass = cmd.getOptionValue(OPT_WRITER); + } else { + writerClass = MultiThreadedWriter.class.getCanonicalName(); + } + + writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen); + } + writerThreads.setMultiPut(isMultiPut); + } + + if (isUpdate) { + if (userOwner != null) { + updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent, + userOwner, userNames); + } else { + String updaterClass = null; + if (cmd.hasOption(OPT_UPDATER)) { + updaterClass = cmd.getOptionValue(OPT_UPDATER); + } else { + updaterClass = MultiThreadedUpdater.class.getCanonicalName(); + } + updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen); + } + updaterThreads.setBatchUpdate(isBatchUpdate); + updaterThreads.setIgnoreNonceConflicts(ignoreConflicts); + } + + if (isRead) { + if (userOwner != null) { + readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, + userNames); + } else { + String readerClass = null; + if (cmd.hasOption(OPT_READER)) { + readerClass = cmd.getOptionValue(OPT_READER); + } else { + readerClass = MultiThreadedReader.class.getCanonicalName(); + } + readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen); + } + readerThreads.setMaxErrors(maxReadErrors); + readerThreads.setKeyWindow(keyWindow); + readerThreads.setMultiGetBatchSize(multiGetBatchSize); + readerThreads.setRegionReplicaId(regionReplicaId); + } + + if (isUpdate && isWrite) { + LOG.info("Concurrent write/update workload: making updaters aware of the " + + "write point"); + updaterThreads.linkToWriter(writerThreads); + } + + if (isRead && (isUpdate || isWrite)) { + LOG.info("Concurrent write/read workload: making readers aware of the " + + "write point"); + readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads); + } + + if (isWrite) { + System.out.println("Starting to write data..."); + writerThreads.start(startKey, endKey, numWriterThreads); + } + + if (isUpdate) { + LOG.info("Starting to mutate data..."); + System.out.println("Starting to mutate data..."); + // TODO : currently append and increment operations not tested with tags + // Will update this aftet it is done + updaterThreads.start(startKey, endKey, numUpdaterThreads); + } + + if (isRead) { + System.out.println("Starting to read data..."); + readerThreads.start(startKey, endKey, numReaderThreads); + } + + if (isWrite) { + writerThreads.waitForFinish(); + } + + if (isUpdate) { + updaterThreads.waitForFinish(); + } + + if (isRead) { + readerThreads.waitForFinish(); + } + + boolean success = true; + if (isWrite) { + success = success && writerThreads.getNumWriteFailures() == 0; + } + if (isUpdate) { + success = success && updaterThreads.getNumWriteFailures() == 0; + } + if (isRead) { + success = success && readerThreads.getNumReadErrors() == 0 + && readerThreads.getNumReadFailures() == 0; + } + return success ? EXIT_SUCCESS : EXIT_FAILURE; + } + + private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException { + try { + Class<?> clazz = Class.forName(clazzName); + Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class, + byte[][].class); + return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize, + minColsPerKey, maxColsPerKey, families); + } catch (Exception e) { + throw new IOException(e); + } + } + + private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName + , LoadTestDataGenerator dataGen) throws IOException { + try { + Class<?> clazz = Class.forName(clazzName); + Constructor<?> constructor = clazz.getConstructor( + LoadTestDataGenerator.class, Configuration.class, TableName.class); + return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName); + } catch (Exception e) { + throw new IOException(e); + } + } + + private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName + , LoadTestDataGenerator dataGen) throws IOException { + try { + Class<?> clazz = Class.forName(clazzName); + Constructor<?> constructor = clazz.getConstructor( + LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class); + return (MultiThreadedUpdater) constructor.newInstance( + dataGen, conf, tableName, updatePercent); + } catch (Exception e) { + throw new IOException(e); + } + } + + private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName + , LoadTestDataGenerator dataGen) throws IOException { + try { + Class<?> clazz = Class.forName(clazzName); + Constructor<?> constructor = clazz.getConstructor( + LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class); + return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent); + } catch (Exception e) { + throw new IOException(e); + } + } + + public static void main(String[] args) { + new LoadTestTool().doStaticMain(args); + } + + /** + * When NUM_TABLES is specified, the function starts multiple worker threads + * which individually start a LoadTestTool instance to load a table. Each + * table name is in format <tn>_<index>. For example, "-tn test -num_tables 2" + * , table names will be "test_1", "test_2" + * + * @throws IOException + */ + private int parallelLoadTables() + throws IOException { + // create new command args + String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME); + String[] newArgs = null; + if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) { + newArgs = new String[cmdLineArgs.length + 2]; + newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME; + newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME; + System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length); + } else { + newArgs = cmdLineArgs; + } + + int tableNameValueIndex = -1; + for (int j = 0; j < newArgs.length; j++) { + if (newArgs[j].endsWith(OPT_TABLE_NAME)) { + tableNameValueIndex = j + 1; + } else if (newArgs[j].endsWith(NUM_TABLES)) { + // change NUM_TABLES to 1 so that each worker loads one table + newArgs[j + 1] = "1"; + } + } + + // starting to load multiple tables + List<WorkerThread> workers = new ArrayList<>(); + for (int i = 0; i < numTables; i++) { + String[] workerArgs = newArgs.clone(); + workerArgs[tableNameValueIndex] = tableName + "_" + (i+1); + WorkerThread worker = new WorkerThread(i, workerArgs); + workers.add(worker); + LOG.info(worker + " starting"); + worker.start(); + } + + // wait for all workers finish + LOG.info("Waiting for worker threads to finish"); + for (WorkerThread t : workers) { + try { + t.join(); + } catch (InterruptedException ie) { + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + checkForErrors(); + } + + return EXIT_SUCCESS; + } + + // If an exception is thrown by one of worker threads, it will be + // stored here. + protected AtomicReference<Throwable> thrown = new AtomicReference<>(); + + private void workerThreadError(Throwable t) { + thrown.compareAndSet(null, t); + } + + /** + * Check for errors in the writer threads. If any is found, rethrow it. + */ + private void checkForErrors() throws IOException { + Throwable thrown = this.thrown.get(); + if (thrown == null) return; + if (thrown instanceof IOException) { + throw (IOException) thrown; + } else { + throw new RuntimeException(thrown); + } + } + + class WorkerThread extends Thread { + private String[] workerArgs; + + WorkerThread(int i, String[] args) { + super("WorkerThread-" + i); + workerArgs = args; + } + + @Override + public void run() { + try { + int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs); + if (ret != 0) { + throw new RuntimeException("LoadTestTool exit with non-zero return code."); + } + } catch (Exception ex) { + LOG.error("Error in worker thread", ex); + workerThreadError(ex); + } + } + } + + private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner, + String userList) throws IOException { + List<String> users = new ArrayList(Arrays.asList(userList.split(","))); + users.add(owner); + for (String user : users) { + String keyTabFileConfKey = "hbase." + user + ".keytab.file"; + String principalConfKey = "hbase." + user + ".kerberos.principal"; + if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) { + throw new IOException("Authentication configs missing for user : " + user); + } + } + for (String key : authConfig.stringPropertyNames()) { + conf.set(key, authConfig.getProperty(key)); + } + LOG.debug("Added authentication properties to config successfully."); + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/hbase-site.xml ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/resources/hbase-site.xml b/hbase-mapreduce/src/test/resources/hbase-site.xml new file mode 100644 index 0000000..64a1964 --- /dev/null +++ b/hbase-mapreduce/src/test/resources/hbase-site.xml @@ -0,0 +1,161 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<configuration> + <property> + <name>hbase.regionserver.msginterval</name> + <value>1000</value> + <description>Interval between messages from the RegionServer to HMaster + in milliseconds. Default is 15. Set this value low if you want unit + tests to be responsive. + </description> + </property> + <property> + <name>hbase.defaults.for.version.skip</name> + <value>true</value> + </property> + <property> + <name>hbase.server.thread.wakefrequency</name> + <value>1000</value> + <description>Time to sleep in between searches for work (in milliseconds). + Used as sleep interval by service threads such as hbase:meta scanner and log roller. + </description> + </property> + <property> + <name>hbase.master.event.waiting.time</name> + <value>50</value> + <description>Time to sleep between checks to see if a table event took place. + </description> + </property> + <property> + <name>hbase.regionserver.handler.count</name> + <value>5</value> + </property> + <property> + <name>hbase.regionserver.metahandler.count</name> + <value>6</value> + </property> + <property> + <name>hbase.ipc.server.read.threadpool.size</name> + <value>3</value> + </property> + <property> + <name>hbase.master.info.port</name> + <value>-1</value> + <description>The port for the hbase master web UI + Set to -1 if you do not want the info server to run. + </description> + </property> + <property> + <name>hbase.master.port</name> + <value>0</value> + <description>Always have masters and regionservers come up on port '0' so we don't clash over + default ports. + </description> + </property> + <property> + <name>hbase.regionserver.port</name> + <value>0</value> + <description>Always have masters and regionservers come up on port '0' so we don't clash over + default ports. + </description> + </property> + <property> + <name>hbase.ipc.client.fallback-to-simple-auth-allowed</name> + <value>true</value> + </property> + + <property> + <name>hbase.regionserver.info.port</name> + <value>-1</value> + <description>The port for the hbase regionserver web UI + Set to -1 if you do not want the info server to run. + </description> + </property> + <property> + <name>hbase.regionserver.info.port.auto</name> + <value>true</value> + <description>Info server auto port bind. Enables automatic port + search if hbase.regionserver.info.port is already in use. + Enabled for testing to run multiple tests on one machine. + </description> + </property> + <property> + <name>hbase.regionserver.safemode</name> + <value>false</value> + <description> + Turn on/off safe mode in region server. Always on for production, always off + for tests. + </description> + </property> + <property> + <name>hbase.hregion.max.filesize</name> + <value>67108864</value> + <description> + Maximum desired file size for an HRegion. If filesize exceeds + value + (value / 2), the HRegion is split in two. Default: 256M. + + Keep the maximum filesize small so we split more often in tests. + </description> + </property> + <property> + <name>hadoop.log.dir</name> + <value>${user.dir}/../logs</value> + </property> + <property> + <name>hbase.zookeeper.property.clientPort</name> + <value>21818</value> + <description>Property from ZooKeeper's config zoo.cfg. + The port at which the clients will connect. + </description> + </property> + <property> + <name>hbase.defaults.for.version.skip</name> + <value>true</value> + <description> + Set to true to skip the 'hbase.defaults.for.version'. + Setting this to true can be useful in contexts other than + the other side of a maven generation; i.e. running in an + ide. You'll want to set this boolean to true to avoid + seeing the RuntimeException complaint: "hbase-default.xml file + seems to be for and old version of HBase (@@@VERSION@@@), this + version is X.X.X-SNAPSHOT" + </description> + </property> + <property> + <name>hbase.table.sanity.checks</name> + <value>false</value> + <description>Skip sanity checks in tests + </description> + </property> + <property> + <name>hbase.procedure.fail.on.corruption</name> + <value>true</value> + <description> + Enable replay sanity checks on procedure tests. + </description> + </property> + <property> + <name>hbase.hconnection.threads.keepalivetime</name> + <value>3</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/hbase-site2.xml ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/resources/hbase-site2.xml b/hbase-mapreduce/src/test/resources/hbase-site2.xml new file mode 100644 index 0000000..8bef31a --- /dev/null +++ b/hbase-mapreduce/src/test/resources/hbase-site2.xml @@ -0,0 +1,146 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<configuration> + <property> + <name>hbase.custom.config</name> + <value>1000</value> + </property> + <property> + <name>hbase.regionserver.msginterval</name> + <value>1000</value> + <description>Interval between messages from the RegionServer to HMaster + in milliseconds. Default is 15. Set this value low if you want unit + tests to be responsive. + </description> + </property> + <property> + <name>hbase.defaults.for.version.skip</name> + <value>true</value> + </property> + <property> + <name>hbase.server.thread.wakefrequency</name> + <value>1000</value> + <description>Time to sleep in between searches for work (in milliseconds). + Used as sleep interval by service threads such as hbase:meta scanner and log roller. + </description> + </property> + <property> + <name>hbase.master.event.waiting.time</name> + <value>50</value> + <description>Time to sleep between checks to see if a table event took place. + </description> + </property> + <property> + <name>hbase.regionserver.handler.count</name> + <value>5</value> + </property> + <property> + <name>hbase.master.info.port</name> + <value>-1</value> + <description>The port for the hbase master web UI + Set to -1 if you do not want the info server to run. + </description> + </property> + <property> + <name>hbase.master.port</name> + <value>0</value> + <description>Always have masters and regionservers come up on port '0' so we don't clash over + default ports. + </description> + </property> + <property> + <name>hbase.regionserver.port</name> + <value>0</value> + <description>Always have masters and regionservers come up on port '0' so we don't clash over + default ports. + </description> + </property> + <property> + <name>hbase.ipc.client.fallback-to-simple-auth-allowed</name> + <value>true</value> + </property> + + <property> + <name>hbase.regionserver.info.port</name> + <value>-1</value> + <description>The port for the hbase regionserver web UI + Set to -1 if you do not want the info server to run. + </description> + </property> + <property> + <name>hbase.regionserver.info.port.auto</name> + <value>true</value> + <description>Info server auto port bind. Enables automatic port + search if hbase.regionserver.info.port is already in use. + Enabled for testing to run multiple tests on one machine. + </description> + </property> + <property> + <name>hbase.regionserver.safemode</name> + <value>false</value> + <description> + Turn on/off safe mode in region server. Always on for production, always off + for tests. + </description> + </property> + <property> + <name>hbase.hregion.max.filesize</name> + <value>67108864</value> + <description> + Maximum desired file size for an HRegion. If filesize exceeds + value + (value / 2), the HRegion is split in two. Default: 256M. + + Keep the maximum filesize small so we split more often in tests. + </description> + </property> + <property> + <name>hadoop.log.dir</name> + <value>${user.dir}/../logs</value> + </property> + <property> + <name>hbase.zookeeper.property.clientPort</name> + <value>21818</value> + <description>Property from ZooKeeper's config zoo.cfg. + The port at which the clients will connect. + </description> + </property> + <property> + <name>hbase.defaults.for.version.skip</name> + <value>true</value> + <description> + Set to true to skip the 'hbase.defaults.for.version'. + Setting this to true can be useful in contexts other than + the other side of a maven generation; i.e. running in an + ide. You'll want to set this boolean to true to avoid + seeing the RuntimeException complaint: "hbase-default.xml file + seems to be for and old version of HBase (@@@VERSION@@@), this + version is X.X.X-SNAPSHOT" + </description> + </property> + <property> + <name>hbase.table.sanity.checks</name> + <value>false</value> + <description>Skip sanity checks in tests + </description> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/hdfs-site.xml ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/resources/hdfs-site.xml b/hbase-mapreduce/src/test/resources/hdfs-site.xml new file mode 100644 index 0000000..03be0c7 --- /dev/null +++ b/hbase-mapreduce/src/test/resources/hdfs-site.xml @@ -0,0 +1,32 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<configuration> + + <!-- hadoop-2.0.5+'s HDFS-4305 by default enforces a min blocks size + of 1024*1024. Many unit tests that use the hlog use smaller + blocks. Setting this config to 0 to have tests pass --> + <property> + <name>dfs.namenode.fs-limits.min-block-size</name> + <value>0</value> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/resources/log4j.properties b/hbase-mapreduce/src/test/resources/log4j.properties new file mode 100644 index 0000000..c322699 --- /dev/null +++ b/hbase-mapreduce/src/test/resources/log4j.properties @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define some default values that can be overridden by system properties +hbase.root.logger=INFO,console +hbase.log.dir=. +hbase.log.file=hbase.log + +# Define the root logger to the system property "hbase.root.logger". +log4j.rootLogger=${hbase.root.logger} + +# Logging Threshold +log4j.threshold=ALL + +# +# Daily Rolling File Appender +# +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout +# Debugging Pattern format +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n + +# Custom Logging levels + +#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG + +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.apache.hadoop.hbase=DEBUG + +#These settings are workarounds against spurious logs from the minicluster. +#See HBASE-4709 +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN +log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN +log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN +# Enable this to get detailed connection error/retry logging. +# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/mapred-queues.xml ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/resources/mapred-queues.xml b/hbase-mapreduce/src/test/resources/mapred-queues.xml new file mode 100644 index 0000000..43f3e2a --- /dev/null +++ b/hbase-mapreduce/src/test/resources/mapred-queues.xml @@ -0,0 +1,75 @@ +<?xml version="1.0"?> +<!-- +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<!-- This is the template for queue configuration. The format supports nesting of + queues within queues - a feature called hierarchical queues. All queues are + defined within the 'queues' tag which is the top level element for this + XML document. + The 'aclsEnabled' attribute should be set to true, if ACLs should be checked + on queue operations such as submitting jobs, killing jobs etc. --> +<queues aclsEnabled="false"> + + <!-- Configuration for a queue is specified by defining a 'queue' element. --> + <queue> + + <!-- Name of a queue. Queue name cannot contain a ':' --> + <name>default</name> + + <!-- properties for a queue, typically used by schedulers, + can be defined here --> + <properties> + </properties> + + <!-- State of the queue. If running, the queue will accept new jobs. + If stopped, the queue will not accept new jobs. --> + <state>running</state> + + <!-- Specifies the ACLs to check for submitting jobs to this queue. + If set to '*', it allows all users to submit jobs to the queue. + For specifying a list of users and groups the format to use is + user1,user2 group1,group2 --> + <acl-submit-job>*</acl-submit-job> + + <!-- Specifies the ACLs to check for modifying jobs in this queue. + Modifications include killing jobs, tasks of jobs or changing + priorities. + If set to '*', it allows all users to submit jobs to the queue. + For specifying a list of users and groups the format to use is + user1,user2 group1,group2 --> + <acl-administer-jobs>*</acl-administer-jobs> + </queue> + + <!-- Here is a sample of a hierarchical queue configuration + where q2 is a child of q1. In this example, q2 is a leaf level + queue as it has no queues configured within it. Currently, ACLs + and state are only supported for the leaf level queues. + Note also the usage of properties for the queue q2. + <queue> + <name>q1</name> + <queue> + <name>q2</name> + <properties> + <property key="capacity" value="20"/> + <property key="user-limit" value="30"/> + </properties> + </queue> + </queue> + --> +</queues> http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/mapred-site.xml ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/resources/mapred-site.xml b/hbase-mapreduce/src/test/resources/mapred-site.xml new file mode 100644 index 0000000..787ffb7 --- /dev/null +++ b/hbase-mapreduce/src/test/resources/mapred-site.xml @@ -0,0 +1,34 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> +<configuration> +<property> + <name>mapred.map.child.java.opts</name> + <value>-Djava.awt.headless=true</value> +</property> + +<property> + <name>mapred.reduce.child.java.opts</name> + <value>-Djava.awt.headless=true</value> +</property> +</configuration> + http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/PerformanceEvaluation_Counter.properties ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/PerformanceEvaluation_Counter.properties b/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/PerformanceEvaluation_Counter.properties new file mode 100644 index 0000000..6fca96a --- /dev/null +++ b/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/PerformanceEvaluation_Counter.properties @@ -0,0 +1,28 @@ +# ResourceBundle properties file for Map-Reduce counters + +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +CounterGroupName= HBase Performance Evaluation +ELAPSED_TIME.name= Elapsed time in milliseconds +ROWS.name= Row count +# ResourceBundle properties file for Map-Reduce counters + +CounterGroupName= HBase Performance Evaluation +ELAPSED_TIME.name= Elapsed time in milliseconds +ROWS.name= Row count http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format b/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format new file mode 100755 index 0000000..762ddd7 Binary files /dev/null and b/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format differ http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-rest/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-rest/pom.xml b/hbase-rest/pom.xml index 3af9829..639c0c2 100644 --- a/hbase-rest/pom.xml +++ b/hbase-rest/pom.xml @@ -212,6 +212,16 @@ </dependency> <dependency> <groupId>org.apache.hbase</groupId> + <artifactId>hbase-mapreduce</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-mapreduce</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop-compat</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java index 3559ee0..6ed170e 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java @@ -220,8 +220,8 @@ public class PerformanceEvaluation extends Configured implements Tool { /** * This class works as the InputSplit of Performance Evaluation - * MapReduce InputFormat, and the Record Value of RecordReader. - * Each map task will only read one record from a PeInputSplit, + * MapReduce InputFormat, and the Record Value of RecordReader. + * Each map task will only read one record from a PeInputSplit, * the record value is the PeInputSplit itself. */ public static class PeInputSplit extends InputSplit implements Writable { @@ -950,7 +950,7 @@ public class PerformanceEvaluation extends Configured implements Tool { static abstract class TableTest extends Test { protected Table table; - + public TableTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java index bcd433c..d520113 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.FSUtils; * <p> * This also allows one to run the scan from an * online or offline hbase cluster. The snapshot files can be exported by using the - * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, + * org.apache.hadoop.hbase.snapshot.ExportSnapshot tool, * to a pure-hdfs cluster, and this scanner can be used to * run the scan directly over the snapshot files. The snapshot should not be deleted while there * are open scanners reading from snapshot files. @@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.util.FSUtils; * snapshot files, the job has to be run as the HBase user or the user must have group or other * priviledges in the filesystem (See HBASE-8369). Note that, given other users access to read from * snapshot/data files will completely circumvent the access control enforced by HBase. - * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat + * See org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat. */ @InterfaceAudience.Public public class TableSnapshotScanner extends AbstractClientScanner { http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java deleted file mode 100644 index 618c14a..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.util.ProgramDriver; - -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - -/** - * Driver for hbase mapreduce jobs. Select which to run by passing name of job - * to this main. - */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) -@InterfaceStability.Stable -public class Driver { - - private static ProgramDriver pgd = new ProgramDriver(); - - @VisibleForTesting - static void setProgramDriver(ProgramDriver pgd0) { - pgd = pgd0; - } - - /** - * @param args - * @throws Throwable - */ - public static void main(String[] args) throws Throwable { - pgd.addClass(RowCounter.NAME, RowCounter.class, "Count rows in HBase table"); - ProgramDriver.class.getMethod("driver", new Class[] { String[].class }) - .invoke(pgd, new Object[] { args }); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java deleted file mode 100644 index a534224..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - - -/** - * Extract grouping columns from input record - */ -@InterfaceAudience.Public -public class GroupingTableMap -extends MapReduceBase -implements TableMap<ImmutableBytesWritable,Result> { - - /** - * JobConf parameter to specify the columns used to produce the key passed to - * collect from the map phase - */ - public static final String GROUP_COLUMNS = - "hbase.mapred.groupingtablemap.columns"; - - protected byte [][] columns; - - /** - * Use this before submitting a TableMap job. It will appropriately set up the - * JobConf. - * - * @param table table to be processed - * @param columns space separated list of columns to fetch - * @param groupColumns space separated list of columns used to form the key - * used in collect - * @param mapper map class - * @param job job configuration object - */ - @SuppressWarnings("unchecked") - public static void initJob(String table, String columns, String groupColumns, - Class<? extends TableMap> mapper, JobConf job) { - - TableMapReduceUtil.initTableMapJob(table, columns, mapper, - ImmutableBytesWritable.class, Result.class, job); - job.set(GROUP_COLUMNS, groupColumns); - } - - @Override - public void configure(JobConf job) { - super.configure(job); - String[] cols = job.get(GROUP_COLUMNS, "").split(" "); - columns = new byte[cols.length][]; - for(int i = 0; i < cols.length; i++) { - columns[i] = Bytes.toBytes(cols[i]); - } - } - - /** - * Extract the grouping columns from value to construct a new key. - * - * Pass the new key and value to reduce. - * If any of the grouping columns are not found in the value, the record is skipped. - * @param key - * @param value - * @param output - * @param reporter - * @throws IOException - */ - public void map(ImmutableBytesWritable key, Result value, - OutputCollector<ImmutableBytesWritable,Result> output, - Reporter reporter) throws IOException { - - byte[][] keyVals = extractKeyValues(value); - if(keyVals != null) { - ImmutableBytesWritable tKey = createGroupKey(keyVals); - output.collect(tKey, value); - } - } - - /** - * Extract columns values from the current record. This method returns - * null if any of the columns are not found. - * - * Override this method if you want to deal with nulls differently. - * - * @param r - * @return array of byte values - */ - protected byte[][] extractKeyValues(Result r) { - byte[][] keyVals = null; - ArrayList<byte[]> foundList = new ArrayList<>(); - int numCols = columns.length; - if (numCols > 0) { - for (Cell value: r.listCells()) { - byte [] column = KeyValue.makeColumn(CellUtil.cloneFamily(value), - CellUtil.cloneQualifier(value)); - for (int i = 0; i < numCols; i++) { - if (Bytes.equals(column, columns[i])) { - foundList.add(CellUtil.cloneValue(value)); - break; - } - } - } - if(foundList.size() == numCols) { - keyVals = foundList.toArray(new byte[numCols][]); - } - } - return keyVals; - } - - /** - * Create a key by concatenating multiple column values. - * Override this function in order to produce different types of keys. - * - * @param vals - * @return key generated by concatenating multiple column values - */ - protected ImmutableBytesWritable createGroupKey(byte[][] vals) { - if(vals == null) { - return null; - } - StringBuilder sb = new StringBuilder(); - for(int i = 0; i < vals.length; i++) { - if(i > 0) { - sb.append(" "); - } - sb.append(Bytes.toString(vals[i])); - } - return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString())); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java deleted file mode 100644 index 0011a60..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Partitioner; - - -/** - * This is used to partition the output keys into groups of keys. - * Keys are grouped according to the regions that currently exist - * so that each reducer fills a single region so load is distributed. - * - * @param <K2> - * @param <V2> - */ -@InterfaceAudience.Public -public class HRegionPartitioner<K2,V2> -implements Partitioner<ImmutableBytesWritable, V2> { - private static final Log LOG = LogFactory.getLog(HRegionPartitioner.class); - // Connection and locator are not cleaned up; they just die when partitioner is done. - private Connection connection; - private RegionLocator locator; - private byte[][] startKeys; - - public void configure(JobConf job) { - try { - this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); - TableName tableName = TableName.valueOf(job.get(TableOutputFormat.OUTPUT_TABLE)); - this.locator = this.connection.getRegionLocator(tableName); - } catch (IOException e) { - LOG.error(e); - } - - try { - this.startKeys = this.locator.getStartKeys(); - } catch (IOException e) { - LOG.error(e); - } - } - - public int getPartition(ImmutableBytesWritable key, V2 value, int numPartitions) { - byte[] region = null; - // Only one region return 0 - if (this.startKeys.length == 1){ - return 0; - } - try { - // Not sure if this is cached after a split so we could have problems - // here if a region splits while mapping - region = locator.getRegionLocation(key.get()).getRegionInfo().getStartKey(); - } catch (IOException e) { - LOG.error(e); - } - for (int i = 0; i < this.startKeys.length; i++){ - if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){ - if (i >= numPartitions-1){ - // cover if we have less reduces then regions. - return (Integer.toString(i).hashCode() - & Integer.MAX_VALUE) % numPartitions; - } - return i; - } - } - // if above fails to find start key that match we need to return something - return 0; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java deleted file mode 100644 index dfacff9..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - -/** - * Pass the given key and record as-is to reduce - */ -@InterfaceAudience.Public -public class IdentityTableMap -extends MapReduceBase -implements TableMap<ImmutableBytesWritable, Result> { - - /** constructor */ - public IdentityTableMap() { - super(); - } - - /** - * Use this before submitting a TableMap job. It will - * appropriately set up the JobConf. - * - * @param table table name - * @param columns columns to scan - * @param mapper mapper class - * @param job job configuration - */ - @SuppressWarnings("unchecked") - public static void initJob(String table, String columns, - Class<? extends TableMap> mapper, JobConf job) { - TableMapReduceUtil.initTableMapJob(table, columns, mapper, - ImmutableBytesWritable.class, - Result.class, job); - } - - /** - * Pass the key, value to reduce - * @param key - * @param value - * @param output - * @param reporter - * @throws IOException - */ - public void map(ImmutableBytesWritable key, Result value, - OutputCollector<ImmutableBytesWritable,Result> output, - Reporter reporter) throws IOException { - - // convert - output.collect(key, value); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java deleted file mode 100644 index 9c2e604..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - -/** - * Write to table each key, record pair - */ -@InterfaceAudience.Public -public class IdentityTableReduce -extends MapReduceBase -implements TableReduce<ImmutableBytesWritable, Put> { - @SuppressWarnings("unused") - private static final Log LOG = - LogFactory.getLog(IdentityTableReduce.class.getName()); - - /** - * No aggregation, output pairs of (key, record) - * @param key - * @param values - * @param output - * @param reporter - * @throws IOException - */ - public void reduce(ImmutableBytesWritable key, Iterator<Put> values, - OutputCollector<ImmutableBytesWritable, Put> output, - Reporter reporter) - throws IOException { - - while(values.hasNext()) { - output.collect(key, values.next()); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java deleted file mode 100644 index 3e121fe..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapred; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl; -import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -/** - * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred - * .TableSnapshotInputFormat} - * allowing a MapReduce job to run over one or more table snapshots, with one or more scans - * configured for each. - * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce - * .TableSnapshotInputFormat} - * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce - * .TableSnapshotInputFormat} for - * more details. - * Usage is similar to TableSnapshotInputFormat, with the following exception: - * initMultiTableSnapshotMapperJob takes in a map - * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding - * scan will be applied; - * the overall dataset for the job is defined by the concatenation of the regions and tables - * included in each snapshot/scan - * pair. - * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map, - * Class, Class, Class, JobConf, boolean, Path)} - * can be used to configure the job. - * <pre>{@code - * Job job = new Job(conf); - * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of( - * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))), - * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))) - * ); - * Path restoreDir = new Path("/tmp/snapshot_restore_dir") - * TableMapReduceUtil.initTableSnapshotMapperJob( - * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class, - * MyMapOutputValueWritable.class, job, true, restoreDir); - * } - * </pre> - * Internally, this input format restores each snapshot into a subdirectory of the given tmp - * directory. Input splits and - * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce - * .TableSnapshotInputFormat} - * (one per region). - * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on - * permissioning; the - * same caveats apply here. - * - * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat - * @see org.apache.hadoop.hbase.client.TableSnapshotScanner - */ -@InterfaceAudience.Public -public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat - implements InputFormat<ImmutableBytesWritable, Result> { - - private final MultiTableSnapshotInputFormatImpl delegate; - - public MultiTableSnapshotInputFormat() { - this.delegate = new MultiTableSnapshotInputFormatImpl(); - } - - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job); - InputSplit[] results = new InputSplit[splits.size()]; - for (int i = 0; i < splits.size(); i++) { - results[i] = new TableSnapshotRegionSplit(splits.get(i)); - } - return results; - } - - @Override - public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job, - Reporter reporter) throws IOException { - return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job); - } - - /** - * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of - * restoreDir. - * Sets: {@link org.apache.hadoop.hbase.mapreduce - * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY}, - * {@link org.apache.hadoop.hbase.mapreduce - * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY} - * - * @param conf - * @param snapshotScans - * @param restoreDir - * @throws IOException - */ - public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans, - Path restoreDir) throws IOException { - new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir); - } - -}