Repository: hbase
Updated Branches:
  refs/heads/branch-1.0 581770780 -> ecc7fb8ed


HBASE-13028 Cleanup MapReduce InputFormats

Conflicts:
        
hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
        
hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
        
hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ecc7fb8e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ecc7fb8e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ecc7fb8e

Branch: refs/heads/branch-1.0
Commit: ecc7fb8ed13d8c6b68235f75a97a5e2ad24d9755
Parents: 5817707
Author: Sean Busbey <bus...@apache.org>
Authored: Fri Feb 13 15:47:11 2015 -0600
Committer: Sean Busbey <bus...@cloudera.com>
Committed: Sat Feb 14 14:32:26 2015 -0600

----------------------------------------------------------------------
 .../hadoop/hbase/mapred/TableInputFormat.java   |  20 +-
 .../hbase/mapred/TableInputFormatBase.java      | 187 ++++++++++++++++---
 .../hbase/mapreduce/TableInputFormat.java       |   4 +-
 .../hbase/mapreduce/TableInputFormatBase.java   | 129 ++++++++-----
 .../hbase/mapred/TestTableInputFormat.java      |  72 ++++++-
 .../hbase/mapreduce/TestTableInputFormat.java   |  63 +++++--
 6 files changed, 376 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ecc7fb8e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
index e1220fb..a7d23d4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
@@ -27,7 +27,8 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -49,6 +50,15 @@ public class TableInputFormat extends TableInputFormatBase 
implements
   public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
 
   public void configure(JobConf job) {
+    try {
+      initialize(job);
+    } catch (Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+    }
+  }
+
+  @Override
+  protected void initialize(JobConf job) throws IOException {
     Path[] tableNames = FileInputFormat.getInputPaths(job);
     String colArg = job.get(COLUMN_LIST);
     String[] colNames = colArg.split(" ");
@@ -57,12 +67,8 @@ public class TableInputFormat extends TableInputFormatBase 
implements
       m_cols[i] = Bytes.toBytes(colNames[i]);
     }
     setInputColumns(m_cols);
-    try {
-      setHTable(
-          new HTable(HBaseConfiguration.create(job), 
TableName.valueOf(tableNames[0].getName())));
-    } catch (Exception e) {
-      LOG.error(StringUtils.stringifyException(e));
-    }
+    Connection connection = ConnectionFactory.createConnection(job);
+    initializeTable(connection, TableName.valueOf(tableNames[0].getName()));
   }
 
   public void validateInput(JobConf job) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecc7fb8e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
index b181dac..da457cf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.mapred;
 
+import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
@@ -25,8 +26,11 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.mapred.InputFormat;
@@ -39,28 +43,35 @@ import org.apache.hadoop.mapred.Reporter;
  * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
  * byte[] of input columns and optionally a {@link Filter}.
  * Subclasses may use other TableRecordReader implementations.
+ *
+ * Subclasses MUST ensure initializeTable(Connection, TableName) is called for 
an instance to
+ * function properly. Each of the entry points to this class used by the 
MapReduce framework,
+ * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link 
#getSplits(JobConf, int)},
+ * will call {@link #initialize(JobConf)} as a convenient centralized location 
to handle
+ * retrieving the necessary configuration information. If your subclass 
overrides either of these
+ * methods, either call the parent version or call initialize yourself.
+ *
  * <p>
  * An example of a subclass:
  * <pre>
- *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
+ *   class ExampleTIF extends TableInputFormatBase {
  *
- *     @Override
- *     public void configure(JobConf job) {
- *       try {
- *         HTable exampleTable = new HTable(HBaseConfiguration.create(job),
- *           Bytes.toBytes("exampleTable"));
- *         // mandatory
- *         setHTable(exampleTable);
- *         byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- *           Bytes.toBytes("columnB") };
- *         // mandatory
- *         setInputColumns(inputColumns);
- *         // optional
- *         Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new 
RegexStringComparator("aa.*"));
- *         setRowFilter(exampleFilter);
- *       } catch (IOException exception) {
- *         throw new RuntimeException("Failed to configure for job.", 
exception);
- *       }
+ *     {@literal @}Override
+ *     protected void initialize(JobConf context) throws IOException {
+ *       // We are responsible for the lifecycle of this connection until we 
hand it over in
+ *       // initializeTable.
+ *       Connection connection =
+ *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ *       TableName tableName = TableName.valueOf("exampleTable");
+ *       // mandatory. once passed here, TableInputFormatBase will handle 
closing the connection.
+ *       initializeTable(connection, tableName);
+ *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ *         Bytes.toBytes("columnB") };
+ *       // mandatory
+ *       setInputColumns(inputColumns);
+ *       // optional, by default we'll get everything for the given columns.
+ *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new 
RegexStringComparator("aa.*"));
+ *       setRowFilter(exampleFilter);
  *     }
  *   }
  * </pre>
@@ -73,9 +84,17 @@ implements InputFormat<ImmutableBytesWritable, Result> {
   private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
   private byte [][] inputColumns;
   private HTable table;
+  private Connection connection;
   private TableRecordReader tableRecordReader;
   private Filter rowFilter;
 
+  private static final String NOT_INITIALIZED = "The input format instance has 
not been properly " +
+      "initialized. Ensure you call initializeTable either in your constructor 
or initialize " +
+      "method";
+  private static final String INITIALIZATION_ERROR = "Cannot create a record 
reader because of a" +
+            " previous error. Please look at the previous logs lines from" +
+            " the task's full log for more details.";
+
   /**
    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
    * the default.
@@ -86,19 +105,63 @@ implements InputFormat<ImmutableBytesWritable, Result> {
   public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
       InputSplit split, JobConf job, Reporter reporter)
   throws IOException {
+    // In case a subclass uses the deprecated approach or calls 
initializeTable directly
+    if (table == null) {
+      initialize(job);
+    }
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
+    }
+
     TableSplit tSplit = (TableSplit) split;
-    TableRecordReader trr = this.tableRecordReader;
     // if no table record reader was provided use default
-    if (trr == null) {
-      trr = new TableRecordReader();
-    }
+    final TableRecordReader trr = this.tableRecordReader == null ? new 
TableRecordReader() :
+        this.tableRecordReader;
     trr.setStartRow(tSplit.getStartRow());
     trr.setEndRow(tSplit.getEndRow());
     trr.setHTable(this.table);
     trr.setInputColumns(this.inputColumns);
     trr.setRowFilter(this.rowFilter);
     trr.init();
-    return trr;
+    return new RecordReader<ImmutableBytesWritable, Result>() {
+
+      @Override
+      public void close() throws IOException {
+        trr.close();
+        closeTable();
+      }
+
+      @Override
+      public ImmutableBytesWritable createKey() {
+        return trr.createKey();
+      }
+
+      @Override
+      public Result createValue() {
+        return trr.createValue();
+      }
+
+      @Override
+      public long getPos() throws IOException {
+        return trr.getPos();
+      }
+
+      @Override
+      public float getProgress() throws IOException {
+        return trr.getProgress();
+      }
+
+      @Override
+      public boolean next(ImmutableBytesWritable key, Result value) throws 
IOException {
+        return trr.next(key, value);
+      }
+    };
   }
 
   /**
@@ -121,8 +184,18 @@ implements InputFormat<ImmutableBytesWritable, Result> {
    */
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
     if (this.table == null) {
-      throw new IOException("No table was provided");
+      initialize(job);
     }
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
+    }
+
     byte [][] startKeys = this.table.getStartKeys();
     if (startKeys == null || startKeys.length == 0) {
       throw new IOException("Expecting at least one region");
@@ -150,6 +223,22 @@ implements InputFormat<ImmutableBytesWritable, Result> {
   }
 
   /**
+   * Allows subclasses to initialize the table information.
+   *
+   * @param connection  The Connection to the HBase cluster. MUST be 
unmanaged. We will close.
+   * @param tableName  The {@link TableName} of the table to process.
+   * @throws IOException
+   */
+  protected void initializeTable(Connection connection, TableName tableName) 
throws IOException {
+    if (table != null || connection != null) {
+      LOG.warn("initializeTable called multiple times. Overwriting connection 
and table " +
+          "reference; TableInputFormatBase will not close these old references 
when done.");
+    }
+    this.table = (HTable) connection.getTable(tableName);
+    this.connection = connection;
+  }
+
+  /**
    * @param inputColumns to be passed in {@link Result} to the map task.
    */
   protected void setInputColumns(byte [][] inputColumns) {
@@ -158,8 +247,20 @@ implements InputFormat<ImmutableBytesWritable, Result> {
 
   /**
    * Allows subclasses to get the {@link HTable}.
+   * @deprecated use {@link #getTable()}
    */
+  @Deprecated
   protected HTable getHTable() {
+    return (HTable) getTable();
+  }
+
+  /**
+   * Allows subclasses to get the {@link Table}.
+   */
+  protected Table getTable() {
+    if (table == null) {
+      throw new IllegalStateException(NOT_INITIALIZED);
+    }
     return this.table;
   }
 
@@ -167,7 +268,9 @@ implements InputFormat<ImmutableBytesWritable, Result> {
    * Allows subclasses to set the {@link HTable}.
    *
    * @param table to get the data from
+   * @deprecated use {@link #initializeTable(Connection,TableName)}
    */
+  @Deprecated
   protected void setHTable(HTable table) {
     this.table = table;
   }
@@ -190,4 +293,40 @@ implements InputFormat<ImmutableBytesWritable, Result> {
   protected void setRowFilter(Filter rowFilter) {
     this.rowFilter = rowFilter;
   }
+
+  /**
+   * Handle subclass specific set up.
+   * Each of the entry points used by the MapReduce framework,
+   * {@link #getRecordReader(InputSplit, JobConf, Reporter)} and {@link 
#getSplits(JobConf, int)},
+   * will call {@link #initialize(JobConf)} as a convenient centralized 
location to handle
+   * retrieving the necessary configuration information and calling
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * Subclasses should implement their initialize call such that it is safe to 
call multiple times.
+   * The current TableInputFormatBase implementation relies on a non-null 
table reference to decide
+   * if an initialize call is needed, but this behavior may change in the 
future. In particular,
+   * it is critical that initializeTable not be called multiple times since 
this will leak
+   * Connection instances.
+   *
+   */
+  protected void initialize(JobConf job) throws IOException {
+  }
+
+  /**
+   * Close the Table and related objects that were initialized via
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * @throws IOException
+   */
+  protected void closeTable() throws IOException {
+    close(table, connection);
+    table = null;
+    connection = null;
+  }
+
+  private void close(Closeable... closables) throws IOException {
+    for (Closeable c : closables) {
+      if(c != null) { c.close(); }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecc7fb8e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
index 8896eb0..bc2537b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
@@ -175,7 +175,9 @@ implements Configurable {
   }
 
   @Override
-  protected void initialize() {
+  protected void initialize(JobContext context) throws IOException {
+    // Do we have to worry about mis-matches between the Configuration from 
setConf and the one
+    // in this context?
     TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
     try {
       initializeTable(ConnectionFactory.createConnection(new 
Configuration(conf)), tableName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecc7fb8e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
index 07830dd..7becdb4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
@@ -63,38 +63,39 @@ import org.apache.hadoop.util.StringUtils;
  * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a 
{@link TableName},
  * an {@link Scan} instance that defines the input columns etc. Subclasses may 
use
  * other TableRecordReader implementations.
+ *
+ * Subclasses MUST ensure initializeTable(Connection, TableName) is called for 
an instance to
+ * function properly. Each of the entry points to this class used by the 
MapReduce framework,
+ * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link 
#getSplits(JobContext)},
+ * will call {@link #initialize(JobContext)} as a convenient centralized 
location to handle
+ * retrieving the necessary configuration information. If your subclass 
overrides either of these
+ * methods, either call the parent version or call initialize yourself.
+ *
  * <p>
  * An example of a subclass:
  * <pre>
- *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
- *
- *     private JobConf job;
+ *   class ExampleTIF extends TableInputFormatBase {
  *
- *     @Override
- *     public void configure(JobConf job) {
- *       try {
- *         this.job = job;
- *         byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- *           Bytes.toBytes("columnB") };
- *         // optional
- *         Scan scan = new Scan();
- *         for (byte[] family : inputColumns) {
- *           scan.addFamily(family);
- *         }
- *         Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new 
RegexStringComparator("aa.*"));
- *         scan.setFilter(exampleFilter);
- *         setScan(scan);
- *       } catch (IOException exception) {
- *         throw new RuntimeException("Failed to configure for job.", 
exception);
- *       }
- *
- *     protected void initialize() {
- *       Connection connection =
- *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ *     {@literal @}Override
+ *     protected void initialize(JobContext context) throws IOException {
+ *       // We are responsible for the lifecycle of this connection until we 
hand it over in
+ *       // initializeTable.
+ *       Connection connection = 
ConnectionFactory.createConnection(HBaseConfiguration.create(
+ *              job.getConfiguration()));
  *       TableName tableName = TableName.valueOf("exampleTable");
- *       // mandatory
+ *       // mandatory. once passed here, TableInputFormatBase will handle 
closing the connection.
  *       initializeTable(connection, tableName);
- *    }
+ *       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ *         Bytes.toBytes("columnB") };
+ *       // optional, by default we'll get everything for the table.
+ *       Scan scan = new Scan();
+ *       for (byte[] family : inputColumns) {
+ *         scan.addFamily(family);
+ *       }
+ *       Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new 
RegexStringComparator("aa.*"));
+ *       scan.setFilter(exampleFilter);
+ *       setScan(scan);
+ *     }
  *   }
  * </pre>
  */
@@ -105,6 +106,13 @@ extends InputFormat<ImmutableBytesWritable, Result> {
 
   final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
 
+  private static final String NOT_INITIALIZED = "The input format instance has 
not been properly " +
+      "initialized. Ensure you call initializeTable either in your constructor 
or initialize " +
+      "method";
+  private static final String INITIALIZATION_ERROR = "Cannot create a record 
reader because of a" +
+            " previous error. Please look at the previous logs lines from" +
+            " the task's full log for more details.";
+
   /** Holds the details for the internal scanner.
    *
    * @see Scan */
@@ -141,14 +149,18 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
       InputSplit split, TaskAttemptContext context)
   throws IOException {
+    // Just in case a subclass is relying on JobConfigurable magic.
     if (table == null) {
-      initialize();
+      initialize(context);
     }
-    if (getTable() == null) {
-      // initialize() must not have been implemented in the subclass.
-      throw new IOException("Cannot create a record reader because of a" +
-          " previous error. Please look at the previous logs lines from" +
-          " the task's full log for more details.");
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
     }
     TableSplit tSplit = (TableSplit) split;
     LOG.info("Input split length: " + 
StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
@@ -213,13 +225,20 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   public List<InputSplit> getSplits(JobContext context) throws IOException {
     boolean closeOnFinish = false;
 
+    // Just in case a subclass is relying on JobConfigurable magic.
     if (table == null) {
-      initialize();
+      initialize(context);
       closeOnFinish = true;
     }
-    if (table == null) {
-      // initialize() wasn't implemented, so the table is null.
-      throw new IOException("No table was provided.");
+
+    // null check in case our child overrides getTable to not throw.
+    try {
+      if (getTable() == null) {
+        // initialize() must not have been implemented in the subclass.
+        throw new IOException(INITIALIZATION_ERROR);
+      }
+    } catch (IllegalStateException exception) {
+      throw new IOException(INITIALIZATION_ERROR, exception);
     }
 
     try {
@@ -293,6 +312,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
     }
   }
 
+  /**
+   * @deprecated mistakenly made public in 0.98.7. scope will change to 
package-private
+   */
+  @Deprecated
   public String reverseDNS(InetAddress ipAddress) throws NamingException, 
UnknownHostException {
     String hostName = this.reverseDNSCacheMap.get(ipAddress);
     if (hostName == null) {
@@ -341,7 +364,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   /**
    * Allows subclasses to get the {@link HTable}.
    *
-   * @deprecated
+   * @deprecated use {@link #getTable()}
    */
   @Deprecated
   protected HTable getHTable() {
@@ -353,7 +376,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    */
   protected RegionLocator getRegionLocator() {
     if (regionLocator == null) {
-      initialize();
+      throw new IllegalStateException(NOT_INITIALIZED);
     }
     return regionLocator;
   }
@@ -363,7 +386,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    */
   protected Table getTable() {
     if (table == null) {
-      initialize();
+      throw new IllegalStateException(NOT_INITIALIZED);
     }
     return table;
   }
@@ -373,7 +396,7 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    */
   protected Admin getAdmin() {
     if (admin == null) {
-      initialize();
+      throw new IllegalStateException(NOT_INITIALIZED);
     }
     return admin;
   }
@@ -381,6 +404,9 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   /**
    * Allows subclasses to set the {@link HTable}.
    *
+   * Will attempt to reuse the underlying Connection for our own needs, 
including
+   * retreiving an Admin interface to the HBase cluster.
+   *
    * @param table  The table to get the data from.
    * @throws IOException 
    * @deprecated Use {@link #initializeTable(Connection, TableName)} instead.
@@ -417,6 +443,10 @@ extends InputFormat<ImmutableBytesWritable, Result> {
    * @throws IOException 
    */
   protected void initializeTable(Connection connection, TableName tableName) 
throws IOException {
+    if (table != null || connection != null) {
+      LOG.warn("initializeTable called multiple times. Overwriting connection 
and table " +
+          "reference; TableInputFormatBase will not close these old references 
when done.");
+    }
     this.table = connection.getTable(tableName);
     this.regionLocator = connection.getRegionLocator(tableName);
     this.admin = connection.getAdmin();
@@ -453,12 +483,21 @@ extends InputFormat<ImmutableBytesWritable, Result> {
   }
   
   /**
-   * This method will be called when any of the following are referenced, but 
not yet initialized:
-   * admin, regionLocator, table. Subclasses will have the opportunity to call
-   * {@link #initializeTable(Connection, TableName)}
+   * Handle subclass specific set up.
+   * Each of the entry points used by the MapReduce framework,
+   * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link 
#getSplits(JobContext)},
+   * will call {@link #initialize(JobContext)} as a convenient centralized 
location to handle
+   * retrieving the necessary configuration information and calling
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * Subclasses should implement their initialize call such that it is safe to 
call multiple times.
+   * The current TableInputFormatBase implementation relies on a non-null 
table reference to decide
+   * if an initialize call is needed, but this behavior may change in the 
future. In particular,
+   * it is critical that initializeTable not be called multiple times since 
this will leak
+   * Connection instances.
+   *
    */
-  protected void initialize() {
-   
+  protected void initialize(JobContext context) throws IOException {
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecc7fb8e/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
index 4626b61..edc8cbe 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
@@ -36,6 +36,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
@@ -321,8 +324,30 @@ public class TestTableInputFormat {
     LOG.info("testing use of an InputFormat taht extends InputFormatBase");
     final Table table = createTable(Bytes.toBytes("exampleTable"),
       new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleTIF.class);
+  }
+
+  @Test
+  public void testDeprecatedExtensionOfTableInputFormatBase() throws 
IOException {
+    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
+        + "as it was given in 0.98.");
+    final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
+      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleDeprecatedTIF.class);
+  }
+
+  @Test
+  public void testJobConfigurableExtensionOfTableInputFormatBase() throws 
IOException {
+    LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
+        + "using JobConfigurable.");
+    final Table table = 
createTable(Bytes.toBytes("exampleJobConfigurableTable"),
+      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleJobConfigurableTIF.class);
+  }
+
+  void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
     final JobConf job = MapreduceTestingShim.getJobConf(mrCluster);
-    job.setInputFormat(ExampleTIF.class);
+    job.setInputFormat(clazz);
     job.setOutputFormat(NullOutputFormat.class);
     job.setMapperClass(ExampleVerifier.class);
     job.setNumReduceTasks(0);
@@ -372,13 +397,13 @@ public class TestTableInputFormat {
 
   }
 
-  public static class ExampleTIF extends TableInputFormatBase implements 
JobConfigurable {
+  public static class ExampleDeprecatedTIF extends TableInputFormatBase 
implements JobConfigurable {
 
     @Override
     public void configure(JobConf job) {
       try {
         HTable exampleTable = new HTable(HBaseConfiguration.create(job),
-          Bytes.toBytes("exampleTable"));
+          Bytes.toBytes("exampleDeprecatedTable"));
         // mandatory
         setHTable(exampleTable);
         byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
@@ -395,5 +420,46 @@ public class TestTableInputFormat {
 
   }
 
+  public static class ExampleJobConfigurableTIF extends ExampleTIF implements 
JobConfigurable {
+
+    @Override
+    public void configure(JobConf job) {
+      try {
+        initialize(job);
+      } catch (IOException exception) {
+        throw new RuntimeException("Failed to initialize.", exception);
+      }
+    }
+
+    @Override
+    protected void initialize(JobConf job) throws IOException {
+      initialize(job, "exampleJobConfigurableTable");
+    }
+  }
+
+
+  public static class ExampleTIF extends TableInputFormatBase {
+
+    @Override
+    protected void initialize(JobConf job) throws IOException {
+      initialize(job, "exampleTable");
+    }
+
+    protected void initialize(JobConf job, String table) throws IOException {
+      Connection connection = 
ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+      TableName tableName = TableName.valueOf(table);
+      // mandatory
+      initializeTable(connection, tableName);
+      byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+        Bytes.toBytes("columnB") };
+      // mandatory
+      setInputColumns(inputColumns);
+      Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new 
RegexStringComparator("aa.*"));
+      // optional
+      setRowFilter(exampleFilter);
+    }
+
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecc7fb8e/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
index 2602961..566a642 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.junit.AfterClass;
@@ -343,6 +344,16 @@ public class TestTableInputFormat {
   }
 
   @Test
+  public void testJobConfigurableExtensionOfTableInputFormatBase()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
+        "using JobConfigurable.");
+    final Table htable = 
createTable(Bytes.toBytes("exampleJobConfigurableTable"),
+      new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+    testInputFormat(ExampleJobConfigurableTIF.class);
+  }
+
+  @Test
   public void testDeprecatedExtensionOfTableInputFormatBase()
       throws IOException, InterruptedException, ClassNotFoundException {
     LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
@@ -422,13 +433,43 @@ public class TestTableInputFormat {
 
   }
 
-  public static class ExampleTIF extends TableInputFormatBase implements 
JobConfigurable {
 
-    private JobConf job;
+  public static class ExampleJobConfigurableTIF extends TableInputFormatBase
+      implements JobConfigurable {
 
     @Override
     public void configure(JobConf job) {
-      this.job = job;
+      try {
+        Connection connection = 
ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+        TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
+        // mandatory
+        initializeTable(connection, tableName);
+        byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+          Bytes.toBytes("columnB") };
+        //optional
+        Scan scan = new Scan();
+        for (byte[] family : inputColumns) {
+          scan.addFamily(family);
+        }
+        Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new 
RegexStringComparator("aa.*"));
+        scan.setFilter(exampleFilter);
+        setScan(scan);
+      } catch (IOException exception) {
+        throw new RuntimeException("Failed to initialize.", exception);
+      }
+    }
+  }
+
+
+  public static class ExampleTIF extends TableInputFormatBase {
+
+    @Override
+    protected void initialize(JobContext job) throws IOException {
+      Connection connection = 
ConnectionFactory.createConnection(HBaseConfiguration.create(
+          job.getConfiguration()));
+      TableName tableName = TableName.valueOf("exampleTable");
+      // mandatory
+      initializeTable(connection, tableName);
       byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
         Bytes.toBytes("columnB") };
       //optional
@@ -441,22 +482,6 @@ public class TestTableInputFormat {
       setScan(scan);
     }
 
-    @Override
-    protected void initialize() {
-      if (job == null) {
-        throw new IllegalStateException("must have already gotten the JobConf 
before initialize " +
-            "is called.");
-      }
-      try {
-        Connection connection = 
ConnectionFactory.createConnection(HBaseConfiguration.create(job));
-        TableName tableName = TableName.valueOf("exampleTable");
-        // mandatory
-        initializeTable(connection, tableName);
-      } catch (IOException exception) {
-        throw new RuntimeException("Failed to initialize.", exception);
-      }
-    }
-
   }
 }
 

Reply via email to