Repository: hbase Updated Branches: refs/heads/branch-1 b421498ce -> 9b8f59cdf
HBASE-12798 Map Reduce jobs should not create Tables in setConf() (Solomon Duskis) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9b8f59cd Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9b8f59cd Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9b8f59cd Branch: refs/heads/branch-1 Commit: 9b8f59cdf9a8afe4f86421ac330be941d98ceb0d Parents: b421498 Author: tedyu <[email protected]> Authored: Sun Jan 11 09:22:12 2015 -0800 Committer: tedyu <[email protected]> Committed: Sun Jan 11 09:22:12 2015 -0800 ---------------------------------------------------------------------- .../hbase/mapreduce/TableInputFormat.java | 18 ++-- .../hbase/mapreduce/TableInputFormatBase.java | 94 ++++++++++++++++---- .../hbase/mapreduce/TableOutputFormat.java | 22 +++-- 3 files changed, 102 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9b8f59cd/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 50da9bc..8896eb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -49,6 +49,7 @@ import org.apache.hadoop.util.StringUtils; public class TableInputFormat extends TableInputFormatBase implements Configurable { + @SuppressWarnings("hiding") private static final Log LOG = LogFactory.getLog(TableInputFormat.class); /** Job parameter that specifies the input table. */ @@ -112,13 +113,6 @@ implements Configurable { @Override public void setConf(Configuration configuration) { this.conf = configuration; - TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); - try { - // NOTE: This connection doesn't currently get closed explicit1ly. - initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } Scan scan = null; @@ -180,6 +174,16 @@ implements Configurable { setScan(scan); } + @Override + protected void initialize() { + TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); + try { + initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + /** * Parses a combined family and qualifier and adds either both or just the * family in case there is no qualifier. This assumes the older colon http://git-wip-us.apache.org/repos/asf/hbase/blob/9b8f59cd/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 3bf001b..4f06d31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -66,12 +66,10 @@ import org.apache.hadoop.util.StringUtils; * <pre> * class ExampleTIF extends TableInputFormatBase implements JobConfigurable { * + * private JobConf job; + * * public void configure(JobConf job) { - * Connection connection = - * ConnectionFactory.createConnection(HBaseConfiguration.create(job)); - * TableName tableName = TableName.valueOf("exampleTable"); - * // mandatory - * initializeTable(connection, tableName); + * this.job = job; * Text[] inputColumns = new byte [][] { Bytes.toBytes("cf1:columnA"), * Bytes.toBytes("cf2") }; * // mandatory @@ -80,6 +78,14 @@ import org.apache.hadoop.util.StringUtils; * // optional * setRowFilter(exampleFilter); * } + * + * protected void initialize() { + * Connection connection = + * ConnectionFactory.createConnection(HBaseConfiguration.create(job)); + * TableName tableName = TableName.valueOf("exampleTable"); + * // mandatory + * initializeTable(connection, tableName); + * } * * public void validateInput(JobConf job) throws IOException { * } @@ -105,13 +111,14 @@ extends InputFormat<ImmutableBytesWritable, Result> { private RegionLocator regionLocator; /** The reader scanning the table, can be a custom one. */ private TableRecordReader tableRecordReader = null; + /** The underlying {@link Connection} of the table. */ + private Connection connection; + /** The reverse DNS lookup cache mapping: IPAddress => HostName */ private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<InetAddress, String>(); - private Connection connection; - /** * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses * the default. @@ -129,6 +136,10 @@ extends InputFormat<ImmutableBytesWritable, Result> { InputSplit split, TaskAttemptContext context) throws IOException { if (table == null) { + initialize(); + } + if (getTable() == null) { + // initialize() must not have been implemented in the subclass. throw new IOException("Cannot create a record reader because of a" + " previous error. Please look at the previous logs lines from" + " the task's full log for more details."); @@ -141,19 +152,13 @@ extends InputFormat<ImmutableBytesWritable, Result> { sc.setStartRow(tSplit.getStartRow()); sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); - trr.setTable(table); + trr.setTable(getTable()); return new RecordReader<ImmutableBytesWritable, Result>() { @Override public void close() throws IOException { trr.close(); - close(admin, table, regionLocator, connection); - } - - private void close(Closeable... closables) throws IOException { - for (Closeable c : closables) { - if(c != null) { c.close(); } - } + closeTable(); } @Override @@ -185,7 +190,7 @@ extends InputFormat<ImmutableBytesWritable, Result> { } protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { - return regionLocator.getStartEndKeys(); + return getRegionLocator().getStartEndKeys(); } /** @@ -200,10 +205,18 @@ extends InputFormat<ImmutableBytesWritable, Result> { */ @Override public List<InputSplit> getSplits(JobContext context) throws IOException { + boolean closeOnFinish = false; + if (table == null) { + initialize(); + closeOnFinish = true; + } + if (table == null) { + // initialize() wasn't implemented, so the table is null. throw new IOException("No table was provided."); } + try { RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, admin); Pair<byte[][], byte[][]> keys = getStartEndKeys(); @@ -267,6 +280,11 @@ extends InputFormat<ImmutableBytesWritable, Result> { } } return splits; + } finally { + if (closeOnFinish) { + closeTable(); + } + } } public String reverseDNS(InetAddress ipAddress) throws NamingException, UnknownHostException { @@ -321,13 +339,16 @@ extends InputFormat<ImmutableBytesWritable, Result> { */ @Deprecated protected HTable getHTable() { - return (HTable) this.table; + return (HTable) this.getTable(); } /** * Allows subclasses to get the {@link RegionLocator}. */ protected RegionLocator getRegionLocator() { + if (regionLocator == null) { + initialize(); + } return regionLocator; } @@ -335,6 +356,9 @@ extends InputFormat<ImmutableBytesWritable, Result> { * Allows subclasses to get the {@link Table}. */ protected Table getTable() { + if (table == null) { + initialize(); + } return table; } @@ -342,6 +366,9 @@ extends InputFormat<ImmutableBytesWritable, Result> { * Allows subclasses to get the {@link Admin}. */ protected Admin getAdmin() { + if (admin == null) { + initialize(); + } return admin; } @@ -356,7 +383,8 @@ extends InputFormat<ImmutableBytesWritable, Result> { protected void setHTable(HTable table) throws IOException { this.table = table; this.regionLocator = table.getRegionLocator(); - this.admin = table.getConnection().getAdmin(); + this.connection = table.getConnection(); + this.admin = this.connection.getAdmin(); } /** @@ -401,4 +429,34 @@ extends InputFormat<ImmutableBytesWritable, Result> { protected void setTableRecordReader(TableRecordReader tableRecordReader) { this.tableRecordReader = tableRecordReader; } + + /** + * This method will be called when any of the following are referenced, but not yet initialized: + * admin, regionLocator, table. Subclasses will have the opportunity to call + * {@link #initializeTable(Connection, TableName)} + */ + protected void initialize() { + + } + + /** + * Close the Table and related objects that were initialized via + * {@link #initializeTable(Connection, TableName)}. + * + * @throws IOException + */ + protected void closeTable() throws IOException { + close(admin, table, regionLocator, connection); + admin = null; + table = null; + regionLocator = null; + connection = null; + } + + private void close(Closeable... closables) throws IOException { + for (Closeable c : closables) { + if(c != null) { c.close(); } + } + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/9b8f59cd/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index cd69a5b..c46f41f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -79,15 +79,26 @@ implements Configurable { /** The configuration. */ private Configuration conf = null; - private Table table; - private Connection connection; - /** * Writes the reducer output to an HBase table. */ protected class TableRecordWriter extends RecordWriter<KEY, Mutation> { + private Connection connection; + private Table table; + + /** + * @throws IOException + * + */ + public TableRecordWriter() throws IOException { + String tableName = conf.get(OUTPUT_TABLE); + this.connection = ConnectionFactory.createConnection(conf); + this.table = connection.getTable(TableName.valueOf(tableName)); + this.table.setAutoFlushTo(false); + LOG.info("Created table instance for " + tableName); + } /** * Closes the writer, in this case flush table commits. * @@ -165,6 +176,7 @@ implements Configurable { return new TableOutputCommitter(); } + @Override public Configuration getConf() { return conf; } @@ -193,10 +205,6 @@ implements Configurable { if (zkClientPort != 0) { this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } - this.connection = ConnectionFactory.createConnection(this.conf); - this.table = connection.getTable(TableName.valueOf(tableName)); - this.table.setAutoFlushTo(false); - LOG.info("Created table instance for " + tableName); } catch(IOException e) { LOG.error(e); throw new RuntimeException(e);
