http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java deleted file mode 100644 index ff458ff..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ /dev/null @@ -1,1027 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.zip.ZipEntry; -import java.util.zip.ZipFile; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.security.token.TokenUtil; -import org.apache.hadoop.hbase.util.Base64; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.StringUtils; - -import com.codahale.metrics.MetricRegistry; - -/** - * Utility for {@link TableMapper} and {@link TableReducer} - */ -@SuppressWarnings({ "rawtypes", "unchecked" }) [email protected] -public class TableMapReduceUtil { - private static final Log LOG = LogFactory.getLog(TableMapReduceUtil.class); - - /** - * Use this before submitting a TableMap job. It will appropriately set up - * the job. - * - * @param table The table name to read from. - * @param scan The scan instance with the columns, time range etc. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(String table, Scan scan, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job) - throws IOException { - initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, - job, true); - } - - - /** - * Use this before submitting a TableMap job. It will appropriately set up - * the job. - * - * @param table The table name to read from. - * @param scan The scan instance with the columns, time range etc. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(TableName table, - Scan scan, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, - Job job) throws IOException { - initTableMapperJob(table.getNameAsString(), - scan, - mapper, - outputKeyClass, - outputValueClass, - job, - true); - } - - /** - * Use this before submitting a TableMap job. It will appropriately set up - * the job. - * - * @param table Binary representation of the table name to read from. - * @param scan The scan instance with the columns, time range etc. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(byte[] table, Scan scan, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job) - throws IOException { - initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, - job, true); - } - - /** - * Use this before submitting a TableMap job. It will appropriately set up - * the job. - * - * @param table The table name to read from. - * @param scan The scan instance with the columns, time range etc. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(String table, Scan scan, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job, - boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) - throws IOException { - initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass, job, - addDependencyJars, true, inputFormatClass); - } - - - /** - * Use this before submitting a TableMap job. It will appropriately set up - * the job. - * - * @param table The table name to read from. - * @param scan The scan instance with the columns, time range etc. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - * @param initCredentials whether to initialize hbase auth credentials for the job - * @param inputFormatClass the input format - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(String table, Scan scan, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job, - boolean addDependencyJars, boolean initCredentials, - Class<? extends InputFormat> inputFormatClass) - throws IOException { - job.setInputFormatClass(inputFormatClass); - if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass); - if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass); - job.setMapperClass(mapper); - if (Put.class.equals(outputValueClass)) { - job.setCombinerClass(PutCombiner.class); - } - Configuration conf = job.getConfiguration(); - HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); - conf.set(TableInputFormat.INPUT_TABLE, table); - conf.set(TableInputFormat.SCAN, convertScanToString(scan)); - conf.setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - KeyValueSerialization.class.getName()); - if (addDependencyJars) { - addDependencyJars(job); - } - if (initCredentials) { - initCredentials(job); - } - } - - /** - * Use this before submitting a TableMap job. It will appropriately set up - * the job. - * - * @param table Binary representation of the table name to read from. - * @param scan The scan instance with the columns, time range etc. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - * @param inputFormatClass The class of the input format - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(byte[] table, Scan scan, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job, - boolean addDependencyJars, Class<? extends InputFormat> inputFormatClass) - throws IOException { - initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, - outputValueClass, job, addDependencyJars, inputFormatClass); - } - - /** - * Use this before submitting a TableMap job. It will appropriately set up - * the job. - * - * @param table Binary representation of the table name to read from. - * @param scan The scan instance with the columns, time range etc. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(byte[] table, Scan scan, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job, - boolean addDependencyJars) - throws IOException { - initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, - outputValueClass, job, addDependencyJars, TableInputFormat.class); - } - - /** - * Use this before submitting a TableMap job. It will appropriately set up - * the job. - * - * @param table The table name to read from. - * @param scan The scan instance with the columns, time range etc. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(String table, Scan scan, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job, - boolean addDependencyJars) - throws IOException { - initTableMapperJob(table, scan, mapper, outputKeyClass, - outputValueClass, job, addDependencyJars, TableInputFormat.class); - } - - /** - * Enable a basic on-heap cache for these jobs. Any BlockCache implementation based on - * direct memory will likely cause the map tasks to OOM when opening the region. This - * is done here instead of in TableSnapshotRegionRecordReader in case an advanced user - * wants to override this behavior in their job. - */ - public static void resetCacheConfig(Configuration conf) { - conf.setFloat( - HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); - conf.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 0f); - conf.unset(HConstants.BUCKET_CACHE_IOENGINE_KEY); - } - - /** - * Sets up the job for reading from one or more table snapshots, with one or more scans - * per snapshot. - * It bypasses hbase servers and read directly from snapshot files. - * - * @param snapshotScans map of snapshot name to scans on that snapshot. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - */ - public static void initMultiTableSnapshotMapperJob(Map<String, Collection<Scan>> snapshotScans, - Class<? extends TableMapper> mapper, Class<?> outputKeyClass, Class<?> outputValueClass, - Job job, boolean addDependencyJars, Path tmpRestoreDir) throws IOException { - MultiTableSnapshotInputFormat.setInput(job.getConfiguration(), snapshotScans, tmpRestoreDir); - - job.setInputFormatClass(MultiTableSnapshotInputFormat.class); - if (outputValueClass != null) { - job.setMapOutputValueClass(outputValueClass); - } - if (outputKeyClass != null) { - job.setMapOutputKeyClass(outputKeyClass); - } - job.setMapperClass(mapper); - Configuration conf = job.getConfiguration(); - HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); - - if (addDependencyJars) { - addDependencyJars(job); - addDependencyJarsForClasses(job.getConfiguration(), MetricRegistry.class); - } - - resetCacheConfig(job.getConfiguration()); - } - - /** - * Sets up the job for reading from a table snapshot. It bypasses hbase servers - * and read directly from snapshot files. - * - * @param snapshotName The name of the snapshot (of a table) to read from. - * @param scan The scan instance with the columns, time range etc. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - * - * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should - * have write permissions to this directory, and this should not be a subdirectory of rootdir. - * After the job is finished, restore directory can be deleted. - * @throws IOException When setting up the details fails. - * @see TableSnapshotInputFormat - */ - public static void initTableSnapshotMapperJob(String snapshotName, Scan scan, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job, - boolean addDependencyJars, Path tmpRestoreDir) - throws IOException { - TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir); - initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, - outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class); - resetCacheConfig(job.getConfiguration()); - } - - /** - * Use this before submitting a Multi TableMap job. It will appropriately set - * up the job. - * - * @param scans The list of {@link Scan} objects to read from. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is carrying - * all necessary HBase configuration. - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(List<Scan> scans, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job) throws IOException { - initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, - true); - } - - /** - * Use this before submitting a Multi TableMap job. It will appropriately set - * up the job. - * - * @param scans The list of {@link Scan} objects to read from. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is carrying - * all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the - * configured job classes via the distributed cache (tmpjars). - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(List<Scan> scans, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job, - boolean addDependencyJars) throws IOException { - initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job, - addDependencyJars, true); - } - - /** - * Use this before submitting a Multi TableMap job. It will appropriately set - * up the job. - * - * @param scans The list of {@link Scan} objects to read from. - * @param mapper The mapper class to use. - * @param outputKeyClass The class of the output key. - * @param outputValueClass The class of the output value. - * @param job The current job to adjust. Make sure the passed job is carrying - * all necessary HBase configuration. - * @param addDependencyJars upload HBase jars and jars for any of the - * configured job classes via the distributed cache (tmpjars). - * @param initCredentials whether to initialize hbase auth credentials for the job - * @throws IOException When setting up the details fails. - */ - public static void initTableMapperJob(List<Scan> scans, - Class<? extends TableMapper> mapper, - Class<?> outputKeyClass, - Class<?> outputValueClass, Job job, - boolean addDependencyJars, - boolean initCredentials) throws IOException { - job.setInputFormatClass(MultiTableInputFormat.class); - if (outputValueClass != null) { - job.setMapOutputValueClass(outputValueClass); - } - if (outputKeyClass != null) { - job.setMapOutputKeyClass(outputKeyClass); - } - job.setMapperClass(mapper); - Configuration conf = job.getConfiguration(); - HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); - List<String> scanStrings = new ArrayList<>(); - - for (Scan scan : scans) { - scanStrings.add(convertScanToString(scan)); - } - job.getConfiguration().setStrings(MultiTableInputFormat.SCANS, - scanStrings.toArray(new String[scanStrings.size()])); - - if (addDependencyJars) { - addDependencyJars(job); - } - - if (initCredentials) { - initCredentials(job); - } - } - - public static void initCredentials(Job job) throws IOException { - UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); - if (userProvider.isHadoopSecurityEnabled()) { - // propagate delegation related props from launcher job to MR job - if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { - job.getConfiguration().set("mapreduce.job.credentials.binary", - System.getenv("HADOOP_TOKEN_FILE_LOCATION")); - } - } - - if (userProvider.isHBaseSecurityEnabled()) { - try { - // init credentials for remote cluster - String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); - User user = userProvider.getCurrent(); - if (quorumAddress != null) { - Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), - quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); - Connection peerConn = ConnectionFactory.createConnection(peerConf); - try { - TokenUtil.addTokenForJob(peerConn, user, job); - } finally { - peerConn.close(); - } - } - - Connection conn = ConnectionFactory.createConnection(job.getConfiguration()); - try { - TokenUtil.addTokenForJob(conn, user, job); - } finally { - conn.close(); - } - } catch (InterruptedException ie) { - LOG.info("Interrupted obtaining user authentication token"); - Thread.currentThread().interrupt(); - } - } - } - - /** - * Obtain an authentication token, for the specified cluster, on behalf of the current user - * and add it to the credentials for the given map reduce job. - * - * The quorumAddress is the key to the ZK ensemble, which contains: - * hbase.zookeeper.quorum, hbase.zookeeper.client.port and - * zookeeper.znode.parent - * - * @param job The job that requires the permission. - * @param quorumAddress string that contains the 3 required configuratins - * @throws IOException When the authentication token cannot be obtained. - * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead. - */ - @Deprecated - public static void initCredentialsForCluster(Job job, String quorumAddress) - throws IOException { - Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), - quorumAddress); - initCredentialsForCluster(job, peerConf); - } - - /** - * Obtain an authentication token, for the specified cluster, on behalf of the current user - * and add it to the credentials for the given map reduce job. - * - * @param job The job that requires the permission. - * @param conf The configuration to use in connecting to the peer cluster - * @throws IOException When the authentication token cannot be obtained. - */ - public static void initCredentialsForCluster(Job job, Configuration conf) - throws IOException { - UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); - if (userProvider.isHBaseSecurityEnabled()) { - try { - Connection peerConn = ConnectionFactory.createConnection(conf); - try { - TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job); - } finally { - peerConn.close(); - } - } catch (InterruptedException e) { - LOG.info("Interrupted obtaining user authentication token"); - Thread.interrupted(); - } - } - } - - /** - * Writes the given scan into a Base64 encoded string. - * - * @param scan The scan to write out. - * @return The scan saved in a Base64 encoded string. - * @throws IOException When writing the scan fails. - */ - public static String convertScanToString(Scan scan) throws IOException { - ClientProtos.Scan proto = ProtobufUtil.toScan(scan); - return Base64.encodeBytes(proto.toByteArray()); - } - - /** - * Converts the given Base64 string back into a Scan instance. - * - * @param base64 The scan details. - * @return The newly created Scan instance. - * @throws IOException When reading the scan instance fails. - */ - public static Scan convertStringToScan(String base64) throws IOException { - byte [] decoded = Base64.decode(base64); - return ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(decoded)); - } - - /** - * Use this before submitting a TableReduce job. It will - * appropriately set up the JobConf. - * - * @param table The output table. - * @param reducer The reducer class to use. - * @param job The current job to adjust. - * @throws IOException When determining the region count fails. - */ - public static void initTableReducerJob(String table, - Class<? extends TableReducer> reducer, Job job) - throws IOException { - initTableReducerJob(table, reducer, job, null); - } - - /** - * Use this before submitting a TableReduce job. It will - * appropriately set up the JobConf. - * - * @param table The output table. - * @param reducer The reducer class to use. - * @param job The current job to adjust. - * @param partitioner Partitioner to use. Pass <code>null</code> to use - * default partitioner. - * @throws IOException When determining the region count fails. - */ - public static void initTableReducerJob(String table, - Class<? extends TableReducer> reducer, Job job, - Class partitioner) throws IOException { - initTableReducerJob(table, reducer, job, partitioner, null, null, null); - } - - /** - * Use this before submitting a TableReduce job. It will - * appropriately set up the JobConf. - * - * @param table The output table. - * @param reducer The reducer class to use. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param partitioner Partitioner to use. Pass <code>null</code> to use - * default partitioner. - * @param quorumAddress Distant cluster to write to; default is null for - * output to the cluster that is designated in <code>hbase-site.xml</code>. - * Set this String to the zookeeper ensemble of an alternate remote cluster - * when you would have the reduce write a cluster that is other than the - * default; e.g. copying tables between clusters, the source would be - * designated by <code>hbase-site.xml</code> and this param would have the - * ensemble address of the remote cluster. The format to pass is particular. - * Pass <code> <hbase.zookeeper.quorum>:< - * hbase.zookeeper.client.port>:<zookeeper.znode.parent> - * </code> such as <code>server,server2,server3:2181:/hbase</code>. - * @param serverClass redefined hbase.regionserver.class - * @param serverImpl redefined hbase.regionserver.impl - * @throws IOException When determining the region count fails. - */ - public static void initTableReducerJob(String table, - Class<? extends TableReducer> reducer, Job job, - Class partitioner, String quorumAddress, String serverClass, - String serverImpl) throws IOException { - initTableReducerJob(table, reducer, job, partitioner, quorumAddress, - serverClass, serverImpl, true); - } - - /** - * Use this before submitting a TableReduce job. It will - * appropriately set up the JobConf. - * - * @param table The output table. - * @param reducer The reducer class to use. - * @param job The current job to adjust. Make sure the passed job is - * carrying all necessary HBase configuration. - * @param partitioner Partitioner to use. Pass <code>null</code> to use - * default partitioner. - * @param quorumAddress Distant cluster to write to; default is null for - * output to the cluster that is designated in <code>hbase-site.xml</code>. - * Set this String to the zookeeper ensemble of an alternate remote cluster - * when you would have the reduce write a cluster that is other than the - * default; e.g. copying tables between clusters, the source would be - * designated by <code>hbase-site.xml</code> and this param would have the - * ensemble address of the remote cluster. The format to pass is particular. - * Pass <code> <hbase.zookeeper.quorum>:< - * hbase.zookeeper.client.port>:<zookeeper.znode.parent> - * </code> such as <code>server,server2,server3:2181:/hbase</code>. - * @param serverClass redefined hbase.regionserver.class - * @param serverImpl redefined hbase.regionserver.impl - * @param addDependencyJars upload HBase jars and jars for any of the configured - * job classes via the distributed cache (tmpjars). - * @throws IOException When determining the region count fails. - */ - public static void initTableReducerJob(String table, - Class<? extends TableReducer> reducer, Job job, - Class partitioner, String quorumAddress, String serverClass, - String serverImpl, boolean addDependencyJars) throws IOException { - - Configuration conf = job.getConfiguration(); - HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); - job.setOutputFormatClass(TableOutputFormat.class); - if (reducer != null) job.setReducerClass(reducer); - conf.set(TableOutputFormat.OUTPUT_TABLE, table); - conf.setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName()); - // If passed a quorum/ensemble address, pass it on to TableOutputFormat. - if (quorumAddress != null) { - // Calling this will validate the format - ZKConfig.validateClusterKey(quorumAddress); - conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); - } - if (serverClass != null && serverImpl != null) { - conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass); - conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl); - } - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(Writable.class); - if (partitioner == HRegionPartitioner.class) { - job.setPartitionerClass(HRegionPartitioner.class); - int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table)); - if (job.getNumReduceTasks() > regions) { - job.setNumReduceTasks(regions); - } - } else if (partitioner != null) { - job.setPartitionerClass(partitioner); - } - - if (addDependencyJars) { - addDependencyJars(job); - } - - initCredentials(job); - } - - /** - * Ensures that the given number of reduce tasks for the given job - * configuration does not exceed the number of regions for the given table. - * - * @param table The table to get the region count for. - * @param job The current job to adjust. - * @throws IOException When retrieving the table details fails. - */ - public static void limitNumReduceTasks(String table, Job job) - throws IOException { - int regions = - MetaTableAccessor.getRegionCount(job.getConfiguration(), TableName.valueOf(table)); - if (job.getNumReduceTasks() > regions) - job.setNumReduceTasks(regions); - } - - /** - * Sets the number of reduce tasks for the given job configuration to the - * number of regions the given table has. - * - * @param table The table to get the region count for. - * @param job The current job to adjust. - * @throws IOException When retrieving the table details fails. - */ - public static void setNumReduceTasks(String table, Job job) - throws IOException { - job.setNumReduceTasks(MetaTableAccessor.getRegionCount(job.getConfiguration(), - TableName.valueOf(table))); - } - - /** - * Sets the number of rows to return and cache with each scanner iteration. - * Higher caching values will enable faster mapreduce jobs at the expense of - * requiring more heap to contain the cached rows. - * - * @param job The current job to adjust. - * @param batchSize The number of rows to return in batch with each scanner - * iteration. - */ - public static void setScannerCaching(Job job, int batchSize) { - job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize); - } - - /** - * Add HBase and its dependencies (only) to the job configuration. - * <p> - * This is intended as a low-level API, facilitating code reuse between this - * class and its mapred counterpart. It also of use to external tools that - * need to build a MapReduce job that interacts with HBase but want - * fine-grained control over the jars shipped to the cluster. - * </p> - * @param conf The Configuration object to extend with dependencies. - * @see org.apache.hadoop.hbase.mapred.TableMapReduceUtil - * @see <a href="https://issues.apache.org/jira/browse/PIG-3285">PIG-3285</a> - */ - public static void addHBaseDependencyJars(Configuration conf) throws IOException { - - // PrefixTreeCodec is part of the hbase-prefix-tree module. If not included in MR jobs jar - // dependencies, MR jobs that write encoded hfiles will fail. - // We used reflection here so to prevent a circular module dependency. - // TODO - if we extract the MR into a module, make it depend on hbase-prefix-tree. - Class prefixTreeCodecClass = null; - try { - prefixTreeCodecClass = - Class.forName("org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"); - } catch (ClassNotFoundException e) { - // this will show up in unit tests but should not show in real deployments - LOG.warn("The hbase-prefix-tree module jar containing PrefixTreeCodec is not present." + - " Continuing without it."); - } - - addDependencyJarsForClasses(conf, - // explicitly pull a class from each module - org.apache.hadoop.hbase.HConstants.class, // hbase-common - org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class, // hbase-protocol - org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, // hbase-protocol-shaded - org.apache.hadoop.hbase.client.Put.class, // hbase-client - org.apache.hadoop.hbase.CompatibilityFactory.class, // hbase-hadoop-compat - org.apache.hadoop.hbase.mapreduce.JobUtil.class, // hbase-hadoop2-compat - org.apache.hadoop.hbase.mapreduce.TableMapper.class, // hbase-server - org.apache.hadoop.hbase.metrics.impl.FastLongHistogram.class, // hbase-metrics - org.apache.hadoop.hbase.metrics.Snapshot.class, // hbase-metrics-api - prefixTreeCodecClass, // hbase-prefix-tree (if null will be skipped) - // pull necessary dependencies - org.apache.zookeeper.ZooKeeper.class, - org.apache.hadoop.hbase.shaded.io.netty.channel.Channel.class, - com.google.protobuf.Message.class, - org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists.class, - org.apache.htrace.Trace.class, - com.codahale.metrics.MetricRegistry.class); - } - - /** - * Returns a classpath string built from the content of the "tmpjars" value in {@code conf}. - * Also exposed to shell scripts via `bin/hbase mapredcp`. - */ - public static String buildDependencyClasspath(Configuration conf) { - if (conf == null) { - throw new IllegalArgumentException("Must provide a configuration object."); - } - Set<String> paths = new HashSet<>(conf.getStringCollection("tmpjars")); - if (paths.isEmpty()) { - throw new IllegalArgumentException("Configuration contains no tmpjars."); - } - StringBuilder sb = new StringBuilder(); - for (String s : paths) { - // entries can take the form 'file:/path/to/file.jar'. - int idx = s.indexOf(":"); - if (idx != -1) s = s.substring(idx + 1); - if (sb.length() > 0) sb.append(File.pathSeparator); - sb.append(s); - } - return sb.toString(); - } - - /** - * Add the HBase dependency jars as well as jars for any of the configured - * job classes to the job configuration, so that JobClient will ship them - * to the cluster and add them to the DistributedCache. - */ - public static void addDependencyJars(Job job) throws IOException { - addHBaseDependencyJars(job.getConfiguration()); - try { - addDependencyJarsForClasses(job.getConfiguration(), - // when making changes here, consider also mapred.TableMapReduceUtil - // pull job classes - job.getMapOutputKeyClass(), - job.getMapOutputValueClass(), - job.getInputFormatClass(), - job.getOutputKeyClass(), - job.getOutputValueClass(), - job.getOutputFormatClass(), - job.getPartitionerClass(), - job.getCombinerClass()); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } - } - - /** - * Add the jars containing the given classes to the job's configuration - * such that JobClient will ship them to the cluster and add them to - * the DistributedCache. - * @deprecated rely on {@link #addDependencyJars(Job)} instead. - */ - @Deprecated - public static void addDependencyJars(Configuration conf, - Class<?>... classes) throws IOException { - LOG.warn("The addDependencyJars(Configuration, Class<?>...) method has been deprecated since it" - + " is easy to use incorrectly. Most users should rely on addDependencyJars(Job) " + - "instead. See HBASE-8386 for more details."); - addDependencyJarsForClasses(conf, classes); - } - - /** - * Add the jars containing the given classes to the job's configuration - * such that JobClient will ship them to the cluster and add them to - * the DistributedCache. - * - * N.B. that this method at most adds one jar per class given. If there is more than one - * jar available containing a class with the same name as a given class, we don't define - * which of those jars might be chosen. - * - * @param conf The Hadoop Configuration to modify - * @param classes will add just those dependencies needed to find the given classes - * @throws IOException if an underlying library call fails. - */ - @InterfaceAudience.Private - public static void addDependencyJarsForClasses(Configuration conf, - Class<?>... classes) throws IOException { - - FileSystem localFs = FileSystem.getLocal(conf); - Set<String> jars = new HashSet<>(); - // Add jars that are already in the tmpjars variable - jars.addAll(conf.getStringCollection("tmpjars")); - - // add jars as we find them to a map of contents jar name so that we can avoid - // creating new jars for classes that have already been packaged. - Map<String, String> packagedClasses = new HashMap<>(); - - // Add jars containing the specified classes - for (Class<?> clazz : classes) { - if (clazz == null) continue; - - Path path = findOrCreateJar(clazz, localFs, packagedClasses); - if (path == null) { - LOG.warn("Could not find jar for class " + clazz + - " in order to ship it to the cluster."); - continue; - } - if (!localFs.exists(path)) { - LOG.warn("Could not validate jar file " + path + " for class " - + clazz); - continue; - } - jars.add(path.toString()); - } - if (jars.isEmpty()) return; - - conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); - } - - /** - * Finds the Jar for a class or creates it if it doesn't exist. If the class is in - * a directory in the classpath, it creates a Jar on the fly with the - * contents of the directory and returns the path to that Jar. If a Jar is - * created, it is created in the system temporary directory. Otherwise, - * returns an existing jar that contains a class of the same name. Maintains - * a mapping from jar contents to the tmp jar created. - * @param my_class the class to find. - * @param fs the FileSystem with which to qualify the returned path. - * @param packagedClasses a map of class name to path. - * @return a jar file that contains the class. - * @throws IOException - */ - private static Path findOrCreateJar(Class<?> my_class, FileSystem fs, - Map<String, String> packagedClasses) - throws IOException { - // attempt to locate an existing jar for the class. - String jar = findContainingJar(my_class, packagedClasses); - if (null == jar || jar.isEmpty()) { - jar = getJar(my_class); - updateMap(jar, packagedClasses); - } - - if (null == jar || jar.isEmpty()) { - return null; - } - - LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar)); - return new Path(jar).makeQualified(fs); - } - - /** - * Add entries to <code>packagedClasses</code> corresponding to class files - * contained in <code>jar</code>. - * @param jar The jar who's content to list. - * @param packagedClasses map[class -> jar] - */ - private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException { - if (null == jar || jar.isEmpty()) { - return; - } - ZipFile zip = null; - try { - zip = new ZipFile(jar); - for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) { - ZipEntry entry = iter.nextElement(); - if (entry.getName().endsWith("class")) { - packagedClasses.put(entry.getName(), jar); - } - } - } finally { - if (null != zip) zip.close(); - } - } - - /** - * Find a jar that contains a class of the same name, if any. It will return - * a jar file, even if that is not the first thing on the class path that - * has a class with the same name. Looks first on the classpath and then in - * the <code>packagedClasses</code> map. - * @param my_class the class to find. - * @return a jar file that contains the class, or null. - * @throws IOException - */ - private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses) - throws IOException { - ClassLoader loader = my_class.getClassLoader(); - - String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; - - if (loader != null) { - // first search the classpath - for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) { - URL url = itr.nextElement(); - if ("jar".equals(url.getProtocol())) { - String toReturn = url.getPath(); - if (toReturn.startsWith("file:")) { - toReturn = toReturn.substring("file:".length()); - } - // URLDecoder is a misnamed class, since it actually decodes - // x-www-form-urlencoded MIME type rather than actual - // URL encoding (which the file path has). Therefore it would - // decode +s to ' 's which is incorrect (spaces are actually - // either unencoded or encoded as "%20"). Replace +s first, so - // that they are kept sacred during the decoding process. - toReturn = toReturn.replaceAll("\\+", "%2B"); - toReturn = URLDecoder.decode(toReturn, "UTF-8"); - return toReturn.replaceAll("!.*$", ""); - } - } - } - - // now look in any jars we've packaged using JarFinder. Returns null when - // no jar is found. - return packagedClasses.get(class_file); - } - - /** - * Invoke 'getJar' on a custom JarFinder implementation. Useful for some job - * configuration contexts (HBASE-8140) and also for testing on MRv2. - * check if we have HADOOP-9426. - * @param my_class the class to find. - * @return a jar file that contains the class, or null. - */ - private static String getJar(Class<?> my_class) { - String ret = null; - try { - ret = JarFinder.getJar(my_class); - } catch (Exception e) { - // toss all other exceptions, related to reflection failure - throw new RuntimeException("getJar invocation failed.", e); - } - - return ret; - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java deleted file mode 100644 index 9a7dcb7..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapper.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapreduce.Mapper; - -/** - * Extends the base <code>Mapper</code> class to add the required input key - * and value classes. - * - * @param <KEYOUT> The type of the key. - * @param <VALUEOUT> The type of the value. - * @see org.apache.hadoop.mapreduce.Mapper - */ [email protected] -public abstract class TableMapper<KEYOUT, VALUEOUT> -extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> { - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java deleted file mode 100644 index 749fd85..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputCommitter.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * Small committer class that does not do anything. - */ [email protected] -public class TableOutputCommitter extends OutputCommitter { - - @Override - public void abortTask(TaskAttemptContext arg0) throws IOException { - } - - @Override - public void cleanupJob(JobContext arg0) throws IOException { - } - - @Override - public void commitTask(TaskAttemptContext arg0) throws IOException { - } - - @Override - public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException { - return false; - } - - @Override - public void setupJob(JobContext arg0) throws IOException { - } - - @Override - public void setupTask(TaskAttemptContext arg0) throws IOException { - } - - public boolean isRecoverySupported() { - return true; - } - - public void recoverTask(TaskAttemptContext taskContext) - throws IOException - { - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java deleted file mode 100644 index 5986df8..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotEnabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored - * while the output value <u>must</u> be either a {@link Put} or a - * {@link Delete} instance. - */ [email protected] -public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation> -implements Configurable { - - private static final Log LOG = LogFactory.getLog(TableOutputFormat.class); - - /** Job parameter that specifies the output table. */ - public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; - - /** - * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}. - * For keys matching this prefix, the prefix is stripped, and the value is set in the - * configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1" - * would be set in the configuration as "key1 = value1". Use this to set properties - * which should only be applied to the {@code TableOutputFormat} configuration and not the - * input configuration. - */ - public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output."; - - /** - * Optional job parameter to specify a peer cluster. - * Used specifying remote cluster when copying between hbase clusters (the - * source is picked up from <code>hbase-site.xml</code>). - * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String) - */ - public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum"; - - /** Optional job parameter to specify peer cluster's ZK client port */ - public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port"; - - /** Optional specification of the rs class name of the peer cluster */ - public static final String - REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class"; - /** Optional specification of the rs impl name of the peer cluster */ - public static final String - REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl"; - - /** The configuration. */ - private Configuration conf = null; - - /** - * Writes the reducer output to an HBase table. - */ - protected class TableRecordWriter - extends RecordWriter<KEY, Mutation> { - - private Connection connection; - private BufferedMutator mutator; - - /** - * @throws IOException - * - */ - public TableRecordWriter() throws IOException { - String tableName = conf.get(OUTPUT_TABLE); - this.connection = ConnectionFactory.createConnection(conf); - this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); - LOG.info("Created table instance for " + tableName); - } - /** - * Closes the writer, in this case flush table commits. - * - * @param context The context. - * @throws IOException When closing the writer fails. - * @see RecordWriter#close(TaskAttemptContext) - */ - @Override - public void close(TaskAttemptContext context) throws IOException { - try { - if (mutator != null) { - mutator.close(); - } - } finally { - if (connection != null) { - connection.close(); - } - } - } - - /** - * Writes a key/value pair into the table. - * - * @param key The key. - * @param value The value. - * @throws IOException When writing fails. - * @see RecordWriter#write(Object, Object) - */ - @Override - public void write(KEY key, Mutation value) - throws IOException { - if (!(value instanceof Put) && !(value instanceof Delete)) { - throw new IOException("Pass a Delete or a Put"); - } - mutator.mutate(value); - } - } - - /** - * Creates a new record writer. - * - * Be aware that the baseline javadoc gives the impression that there is a single - * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new - * RecordWriter per call of this method. You must close the returned RecordWriter when done. - * Failure to do so will drop writes. - * - * @param context The current task context. - * @return The newly created writer instance. - * @throws IOException When creating the writer fails. - * @throws InterruptedException When the jobs is cancelled. - */ - @Override - public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context) - throws IOException, InterruptedException { - return new TableRecordWriter(); - } - - /** - * Checks if the output table exists and is enabled. - * - * @param context The current context. - * @throws IOException When the check fails. - * @throws InterruptedException When the job is aborted. - * @see OutputFormat#checkOutputSpecs(JobContext) - */ - @Override - public void checkOutputSpecs(JobContext context) throws IOException, - InterruptedException { - - try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) { - TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE)); - if (!admin.tableExists(tableName)) { - throw new TableNotFoundException("Can't write, table does not exist:" + - tableName.getNameAsString()); - } - - if (!admin.isTableEnabled(tableName)) { - throw new TableNotEnabledException("Can't write, table is not enabled: " + - tableName.getNameAsString()); - } - } - } - - /** - * Returns the output committer. - * - * @param context The current context. - * @return The committer. - * @throws IOException When creating the committer fails. - * @throws InterruptedException When the job is aborted. - * @see OutputFormat#getOutputCommitter(TaskAttemptContext) - */ - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException, InterruptedException { - return new TableOutputCommitter(); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration otherConf) { - String tableName = otherConf.get(OUTPUT_TABLE); - if(tableName == null || tableName.length() <= 0) { - throw new IllegalArgumentException("Must specify table name"); - } - - String address = otherConf.get(QUORUM_ADDRESS); - int zkClientPort = otherConf.getInt(QUORUM_PORT, 0); - String serverClass = otherConf.get(REGION_SERVER_CLASS); - String serverImpl = otherConf.get(REGION_SERVER_IMPL); - - try { - this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX); - - if (serverClass != null) { - this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); - } - if (zkClientPort != 0) { - this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); - } - } catch(IOException e) { - LOG.error(e); - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java deleted file mode 100644 index f66520b..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) - * pairs. - */ [email protected] -public class TableRecordReader -extends RecordReader<ImmutableBytesWritable, Result> { - - private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl(); - - /** - * Restart from survivable exceptions by creating a new scanner. - * - * @param firstRow The first row to start at. - * @throws IOException When restarting fails. - */ - public void restart(byte[] firstRow) throws IOException { - this.recordReaderImpl.restart(firstRow); - } - - /** - * @param table the {@link Table} to scan. - */ - public void setTable(Table table) { - this.recordReaderImpl.setHTable(table); - } - - /** - * Sets the scan defining the actual details like columns etc. - * - * @param scan The scan to set. - */ - public void setScan(Scan scan) { - this.recordReaderImpl.setScan(scan); - } - - /** - * Closes the split. - * - * @see org.apache.hadoop.mapreduce.RecordReader#close() - */ - @Override - public void close() { - this.recordReaderImpl.close(); - } - - /** - * Returns the current key. - * - * @return The current key. - * @throws IOException - * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey() - */ - @Override - public ImmutableBytesWritable getCurrentKey() throws IOException, - InterruptedException { - return this.recordReaderImpl.getCurrentKey(); - } - - /** - * Returns the current value. - * - * @return The current value. - * @throws IOException When the value is faulty. - * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() - */ - @Override - public Result getCurrentValue() throws IOException, InterruptedException { - return this.recordReaderImpl.getCurrentValue(); - } - - /** - * Initializes the reader. - * - * @param inputsplit The split to work with. - * @param context The current task context. - * @throws IOException When setting up the reader fails. - * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.RecordReader#initialize( - * org.apache.hadoop.mapreduce.InputSplit, - * org.apache.hadoop.mapreduce.TaskAttemptContext) - */ - @Override - public void initialize(InputSplit inputsplit, - TaskAttemptContext context) throws IOException, - InterruptedException { - this.recordReaderImpl.initialize(inputsplit, context); - } - - /** - * Positions the record reader to the next record. - * - * @return <code>true</code> if there was another record. - * @throws IOException When reading the record failed. - * @throws InterruptedException When the job was aborted. - * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue() - */ - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return this.recordReaderImpl.nextKeyValue(); - } - - /** - * The current progress of the record reader through its data. - * - * @return A number between 0.0 and 1.0, the fraction of the data read. - * @see org.apache.hadoop.mapreduce.RecordReader#getProgress() - */ - @Override - public float getProgress() { - return this.recordReaderImpl.getProgress(); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java deleted file mode 100644 index 9a1c98e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ /dev/null @@ -1,315 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ScannerCallable; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.util.StringUtils; - -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - -/** - * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) - * pairs. - */ [email protected] -public class TableRecordReaderImpl { - public static final String LOG_PER_ROW_COUNT - = "hbase.mapreduce.log.scanner.rowcount"; - - private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class); - - // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase - @VisibleForTesting - static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters"; - private ResultScanner scanner = null; - private Scan scan = null; - private Scan currentScan = null; - private Table htable = null; - private byte[] lastSuccessfulRow = null; - private ImmutableBytesWritable key = null; - private Result value = null; - private TaskAttemptContext context = null; - private Method getCounter = null; - private long numRestarts = 0; - private long numStale = 0; - private long timestamp; - private int rowcount; - private boolean logScannerActivity = false; - private int logPerRowCount = 100; - - /** - * Restart from survivable exceptions by creating a new scanner. - * - * @param firstRow The first row to start at. - * @throws IOException When restarting fails. - */ - public void restart(byte[] firstRow) throws IOException { - currentScan = new Scan(scan); - currentScan.withStartRow(firstRow); - currentScan.setScanMetricsEnabled(true); - if (this.scanner != null) { - if (logScannerActivity) { - LOG.info("Closing the previously opened scanner object."); - } - this.scanner.close(); - } - this.scanner = this.htable.getScanner(currentScan); - if (logScannerActivity) { - LOG.info("Current scan=" + currentScan.toString()); - timestamp = System.currentTimeMillis(); - rowcount = 0; - } - } - - /** - * In new mapreduce APIs, TaskAttemptContext has two getCounter methods - * Check if getCounter(String, String) method is available. - * @return The getCounter method or null if not available. - * @throws IOException - */ - protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context) - throws IOException { - Method m = null; - try { - m = context.getClass().getMethod("getCounter", - new Class [] {String.class, String.class}); - } catch (SecurityException e) { - throw new IOException("Failed test for getCounter", e); - } catch (NoSuchMethodException e) { - // Ignore - } - return m; - } - - /** - * Sets the HBase table. - * - * @param htable The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. - */ - public void setHTable(Table htable) { - Configuration conf = htable.getConfiguration(); - logScannerActivity = conf.getBoolean( - ScannerCallable.LOG_SCANNER_ACTIVITY, false); - logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); - this.htable = htable; - } - - /** - * Sets the scan defining the actual details like columns etc. - * - * @param scan The scan to set. - */ - public void setScan(Scan scan) { - this.scan = scan; - } - - /** - * Build the scanner. Not done in constructor to allow for extension. - * - * @throws IOException - * @throws InterruptedException - */ - public void initialize(InputSplit inputsplit, - TaskAttemptContext context) throws IOException, - InterruptedException { - if (context != null) { - this.context = context; - getCounter = retrieveGetCounterWithStringsParams(context); - } - restart(scan.getStartRow()); - } - - /** - * Closes the split. - * - * - */ - public void close() { - if (this.scanner != null) { - this.scanner.close(); - } - try { - this.htable.close(); - } catch (IOException ioe) { - LOG.warn("Error closing table", ioe); - } - } - - /** - * Returns the current key. - * - * @return The current key. - * @throws IOException - * @throws InterruptedException When the job is aborted. - */ - public ImmutableBytesWritable getCurrentKey() throws IOException, - InterruptedException { - return key; - } - - /** - * Returns the current value. - * - * @return The current value. - * @throws IOException When the value is faulty. - * @throws InterruptedException When the job is aborted. - */ - public Result getCurrentValue() throws IOException, InterruptedException { - return value; - } - - - /** - * Positions the record reader to the next record. - * - * @return <code>true</code> if there was another record. - * @throws IOException When reading the record failed. - * @throws InterruptedException When the job was aborted. - */ - public boolean nextKeyValue() throws IOException, InterruptedException { - if (key == null) key = new ImmutableBytesWritable(); - if (value == null) value = new Result(); - try { - try { - value = this.scanner.next(); - if (value != null && value.isStale()) numStale++; - if (logScannerActivity) { - rowcount ++; - if (rowcount >= logPerRowCount) { - long now = System.currentTimeMillis(); - LOG.info("Mapper took " + (now-timestamp) - + "ms to process " + rowcount + " rows"); - timestamp = now; - rowcount = 0; - } - } - } catch (IOException e) { - // do not retry if the exception tells us not to do so - if (e instanceof DoNotRetryIOException) { - throw e; - } - // try to handle all other IOExceptions by restarting - // the scanner, if the second call fails, it will be rethrown - LOG.info("recovered from " + StringUtils.stringifyException(e)); - if (lastSuccessfulRow == null) { - LOG.warn("We are restarting the first next() invocation," + - " if your mapper has restarted a few other times like this" + - " then you should consider killing this job and investigate" + - " why it's taking so long."); - } - if (lastSuccessfulRow == null) { - restart(scan.getStartRow()); - } else { - restart(lastSuccessfulRow); - scanner.next(); // skip presumed already mapped row - } - value = scanner.next(); - if (value != null && value.isStale()) numStale++; - numRestarts++; - } - if (value != null && value.size() > 0) { - key.set(value.getRow()); - lastSuccessfulRow = key.get(); - return true; - } - - updateCounters(); - return false; - } catch (IOException ioe) { - if (logScannerActivity) { - long now = System.currentTimeMillis(); - LOG.info("Mapper took " + (now-timestamp) - + "ms to process " + rowcount + " rows"); - LOG.info(ioe); - String lastRow = lastSuccessfulRow == null ? - "null" : Bytes.toStringBinary(lastSuccessfulRow); - LOG.info("lastSuccessfulRow=" + lastRow); - } - throw ioe; - } - } - - /** - * If hbase runs on new version of mapreduce, RecordReader has access to - * counters thus can update counters based on scanMetrics. - * If hbase runs on old version of mapreduce, it won't be able to get - * access to counters and TableRecorderReader can't update counter values. - * @throws IOException - */ - private void updateCounters() throws IOException { - ScanMetrics scanMetrics = scanner.getScanMetrics(); - if (scanMetrics == null) { - return; - } - - updateCounters(scanMetrics, numRestarts, getCounter, context, numStale); - } - - protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, - Method getCounter, TaskAttemptContext context, long numStale) { - // we can get access to counters only if hbase uses new mapreduce APIs - if (getCounter == null) { - return; - } - - try { - for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) { - Counter ct = (Counter)getCounter.invoke(context, - HBASE_COUNTER_GROUP_NAME, entry.getKey()); - - ct.increment(entry.getValue()); - } - ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, - "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts); - ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, - "NUM_SCAN_RESULTS_STALE")).increment(numStale); - } catch (Exception e) { - LOG.debug("can't update counter." + StringUtils.stringifyException(e)); - } - } - - /** - * The current progress of the record reader through its data. - * - * @return A number between 0.0 and 1.0, the fraction of the data read. - */ - public float getProgress() { - // Depends on the total number of tuples - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java deleted file mode 100644 index f0bfc74..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableReducer.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.mapreduce.Reducer; - -/** - * Extends the basic <code>Reducer</code> class to add the required key and - * value input/output classes. While the input key and value as well as the - * output key can be anything handed in from the previous map phase the output - * value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put} - * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when - * using the {@link TableOutputFormat} class. - * <p> - * This class is extended by {@link IdentityTableReducer} but can also be - * subclassed to implement similar features or any custom code needed. It has - * the advantage to enforce the output value to a specific basic type. - * - * @param <KEYIN> The type of the input key. - * @param <VALUEIN> The type of the input value. - * @param <KEYOUT> The type of the output key. - * @see org.apache.hadoop.mapreduce.Reducer - */ [email protected] -public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> -extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> { -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java deleted file mode 100644 index 7e59c3b..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; - -/** - * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job - * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits, - * wals, etc) directly to provide maximum performance. The snapshot is not required to be - * restored to the live cluster or cloned. This also allows to run the mapreduce job from an - * online or offline hbase cluster. The snapshot files can be exported by using the - * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, - * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. - * The snapshot should not be deleted while there are jobs reading from snapshot files. - * <p> - * Usage is similar to TableInputFormat, and - * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, - * boolean, Path)} - * can be used to configure the job. - * <pre>{@code - * Job job = new Job(conf); - * Scan scan = new Scan(); - * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, - * scan, MyTableMapper.class, MyMapKeyOutput.class, - * MyMapOutputValueWritable.class, job, true); - * } - * </pre> - * <p> - * Internally, this input format restores the snapshot into the given tmp directory. Similar to - * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading - * from each RecordReader. An internal RegionScanner is used to execute the - * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user. - * <p> - * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from - * snapshot files and data files. - * To read from snapshot files directly from the file system, the user who is running the MR job - * must have sufficient permissions to access snapshot and reference files. - * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase - * user or the user must have group or other privileges in the filesystem (See HBASE-8369). - * Note that, given other users access to read from snapshot/data files will completely circumvent - * the access control enforced by HBase. - * @see org.apache.hadoop.hbase.client.TableSnapshotScanner - */ [email protected] -public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> { - - public static class TableSnapshotRegionSplit extends InputSplit implements Writable { - private TableSnapshotInputFormatImpl.InputSplit delegate; - - // constructor for mapreduce framework / Writable - public TableSnapshotRegionSplit() { - this.delegate = new TableSnapshotInputFormatImpl.InputSplit(); - } - - public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) { - this.delegate = delegate; - } - - public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo, - List<String> locations, Scan scan, Path restoreDir) { - this.delegate = - new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir); - } - - @Override - public long getLength() throws IOException, InterruptedException { - return delegate.getLength(); - } - - @Override - public String[] getLocations() throws IOException, InterruptedException { - return delegate.getLocations(); - } - - @Override - public void write(DataOutput out) throws IOException { - delegate.write(out); - } - - @Override - public void readFields(DataInput in) throws IOException { - delegate.readFields(in); - } - - public HRegionInfo getRegionInfo() { - return delegate.getRegionInfo(); - } - - } - - @VisibleForTesting - static class TableSnapshotRegionRecordReader extends - RecordReader<ImmutableBytesWritable, Result> { - private TableSnapshotInputFormatImpl.RecordReader delegate = - new TableSnapshotInputFormatImpl.RecordReader(); - private TaskAttemptContext context; - private Method getCounter; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, - InterruptedException { - this.context = context; - getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context); - delegate.initialize( - ((TableSnapshotRegionSplit) split).delegate, - context.getConfiguration()); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - boolean result = delegate.nextKeyValue(); - if (result) { - ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics(); - if (scanMetrics != null && context != null) { - TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0); - } - } - return result; - } - - @Override - public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { - return delegate.getCurrentKey(); - } - - @Override - public Result getCurrentValue() throws IOException, InterruptedException { - return delegate.getCurrentValue(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return delegate.getProgress(); - } - - @Override - public void close() throws IOException { - delegate.close(); - } - } - - @Override - public RecordReader<ImmutableBytesWritable, Result> createRecordReader( - InputSplit split, TaskAttemptContext context) throws IOException { - return new TableSnapshotRegionRecordReader(); - } - - @Override - public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { - List<InputSplit> results = new ArrayList<>(); - for (TableSnapshotInputFormatImpl.InputSplit split : - TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) { - results.add(new TableSnapshotRegionSplit(split)); - } - return results; - } - - /** - * Configures the job to use TableSnapshotInputFormat to read from a snapshot. - * @param job the job to configure - * @param snapshotName the name of the snapshot to read from - * @param restoreDir a temporary directory to restore the snapshot into. Current user should - * have write permissions to this directory, and this should not be a subdirectory of rootdir. - * After the job is finished, restoreDir can be deleted. - * @throws IOException if an error occurs - */ - public static void setInput(Job job, String snapshotName, Path restoreDir) - throws IOException { - TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir); - } -}
