Repository: hbase
Updated Branches:
  refs/heads/branch-1 4bd846156 -> c75382860


HBASE-14394 Properly close the connection after reading records from table: 
addendum


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c7538286
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c7538286
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c7538286

Branch: refs/heads/branch-1
Commit: c75382860f793f31599cffc6ad697f8ab58c4eb0
Parents: 4bd8461
Author: Jerry He <jerry...@apache.org>
Authored: Tue Sep 29 19:48:02 2015 -0700
Committer: Jerry He <jerry...@apache.org>
Committed: Tue Sep 29 19:49:28 2015 -0700

----------------------------------------------------------------------
 .../mapreduce/MultiTableInputFormatBase.java    | 53 ++++++++++++++++----
 .../hbase/mapreduce/TableRecordReader.java      |  9 +---
 2 files changed, 45 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c7538286/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
index 83dca4b..ff690c8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
@@ -29,7 +29,6 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionLocator;
@@ -92,29 +91,65 @@ public abstract class MultiTableInputFormatBase extends
           + " previous error. Please look at the previous logs lines from"
           + " the task's full log for more details.");
     }
-    Connection connection = 
ConnectionFactory.createConnection(context.getConfiguration());
+    final Connection connection = 
ConnectionFactory.createConnection(context.getConfiguration());
     Table table = connection.getTable(tSplit.getTable());
 
-    TableRecordReader trr = this.tableRecordReader;
+    if (this.tableRecordReader == null) {
+      this.tableRecordReader = new TableRecordReader();
+    }
+    final TableRecordReader trr = this.tableRecordReader;
 
     try {
-      // if no table record reader was provided use default
-      if (trr == null) {
-        trr = new TableRecordReader();
-      }
       Scan sc = tSplit.getScan();
       sc.setStartRow(tSplit.getStartRow());
       sc.setStopRow(tSplit.getEndRow());
       trr.setScan(sc);
       trr.setTable(table);
-      trr.setConnection(connection);
+      return new RecordReader<ImmutableBytesWritable, Result>() {
+
+        @Override
+        public void close() throws IOException {
+          trr.close();
+          if (connection != null) {
+            connection.close();
+          }
+        }
+
+        @Override
+        public ImmutableBytesWritable getCurrentKey() throws IOException, 
InterruptedException {
+          return trr.getCurrentKey();
+        }
+
+        @Override
+        public Result getCurrentValue() throws IOException, 
InterruptedException {
+          return trr.getCurrentValue();
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+          return trr.getProgress();
+        }
+
+        @Override
+        public void initialize(InputSplit inputsplit, TaskAttemptContext 
context)
+            throws IOException, InterruptedException {
+          trr.initialize(inputsplit, context);
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException 
{
+          return trr.nextKeyValue();
+        }
+      };
     } catch (IOException ioe) {
       // If there is an exception make sure that all
       // resources are closed and released.
       trr.close();
+      if (connection != null) {
+        connection.close();
+      }
       throw ioe;
     }
-    return trr;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/c7538286/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
index 9ff90e7..21dc213 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -41,7 +40,6 @@ public class TableRecordReader
 extends RecordReader<ImmutableBytesWritable, Result> {
 
   private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
-  private Connection connection = null;
 
   /**
    * Restart from survivable exceptions by creating a new scanner.
@@ -87,10 +85,8 @@ extends RecordReader<ImmutableBytesWritable, Result> {
    * @see org.apache.hadoop.mapreduce.RecordReader#close()
    */
   @Override
-  public void close() throws IOException {
+  public void close() {
     this.recordReaderImpl.close();
-    if (this.connection != null)
-      this.connection.close();
   }
 
   /**
@@ -162,7 +158,4 @@ extends RecordReader<ImmutableBytesWritable, Result> {
     return this.recordReaderImpl.getProgress();
   }
 
-  public void setConnection(Connection connection) {
-    this.connection = connection;
-  }
 }

Reply via email to