release HBaseAdmin after using it.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3407a81e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3407a81e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3407a81e

Branch: refs/heads/master
Commit: 3407a81ed9ea1468ab0359866d5e921ece3db8c7
Parents: 549279d
Author: DO YUNG YOON <[email protected]>
Authored: Thu May 4 10:48:06 2017 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Thu May 4 10:48:06 2017 +0900

----------------------------------------------------------------------
 .../core/storage/hbase/AsynchbaseStorage.scala  | 170 ++++++++++---------
 1 file changed, 90 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3407a81e/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index e41fe27..5c9695d 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -29,7 +29,7 @@ import com.stumbleupon.async.{Callback, Deferred}
 import com.typesafe.config.Config
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
+import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability}
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
 import org.apache.hadoop.hbase.regionserver.BloomType
@@ -560,97 +560,92 @@ class AsynchbaseStorage(override val graph: S2Graph,
       zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq
     } {
       logger.info(s"create table: $tableName on $zkAddr, $cfs, 
$regionMultiplier, $compressionAlgorithm")
-      val admin = getAdmin(zkAddr)
-      val regionCount = 
totalRegionCount.getOrElse(admin.getClusterStatus.getServersSize * 
regionMultiplier)
-      try {
-        if (!admin.tableExists(TableName.valueOf(tableName))) {
-          val desc = new HTableDescriptor(TableName.valueOf(tableName))
-          desc.setDurability(Durability.ASYNC_WAL)
-          for (cf <- cfs) {
-            val columnDesc = new HColumnDescriptor(cf)
-              
.setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
-              .setBloomFilterType(BloomType.ROW)
-              .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
-              .setMaxVersions(1)
-              .setTimeToLive(2147483647)
-              .setMinVersions(0)
-              .setBlocksize(32768)
-              .setBlockCacheEnabled(true)
+      withAdmin(zkAddr) { admin =>
+        val regionCount = 
totalRegionCount.getOrElse(admin.getClusterStatus.getServersSize * 
regionMultiplier)
+        try {
+          if (!admin.tableExists(TableName.valueOf(tableName))) {
+            val desc = new HTableDescriptor(TableName.valueOf(tableName))
+            desc.setDurability(Durability.ASYNC_WAL)
+            for (cf <- cfs) {
+              val columnDesc = new HColumnDescriptor(cf)
+                
.setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+                .setBloomFilterType(BloomType.ROW)
+                .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+                .setMaxVersions(1)
+                .setTimeToLive(2147483647)
+                .setMinVersions(0)
+                .setBlocksize(32768)
+                .setBlockCacheEnabled(true)
                 // FIXME: For test!!
-              .setInMemory(true)
-            if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
-            if (replicationScopeOpt.isDefined) 
columnDesc.setScope(replicationScopeOpt.get)
-            desc.addFamily(columnDesc)
-          }
+                .setInMemory(true)
+              if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+              if (replicationScopeOpt.isDefined) 
columnDesc.setScope(replicationScopeOpt.get)
+              desc.addFamily(columnDesc)
+            }
 
-          if (regionCount <= 1) admin.createTable(desc)
-          else admin.createTable(desc, getStartKey(regionCount), 
getEndKey(regionCount), regionCount)
-        } else {
-          logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+            if (regionCount <= 1) admin.createTable(desc)
+            else admin.createTable(desc, getStartKey(regionCount), 
getEndKey(regionCount), regionCount)
+          } else {
+            logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+          }
+        } catch {
+          case e: Throwable =>
+            logger.error(s"$zkAddr, $tableName failed with $e", e)
+            throw e
         }
-      } catch {
-        case e: Throwable =>
-          logger.error(s"$zkAddr, $tableName failed with $e", e)
-          throw e
-      } finally {
-        admin.close()
-        admin.getConnection.close()
       }
     }
   }
 
   override def truncateTable(zkAddr: String, tableNameStr: String): Unit = {
-    val tableName = TableName.valueOf(tableNameStr)
-    val adminTry = Try(getAdmin(zkAddr))
-    if (adminTry.isFailure) return
-    val admin = adminTry.get
-
-    if (!Try(admin.tableExists(tableName)).getOrElse(false)) {
-      logger.info(s"No table to truncate ${tableNameStr}")
-      return
-    }
+    withAdmin(zkAddr) { admin =>
+      val tableName = TableName.valueOf(tableNameStr)
+      if (!Try(admin.tableExists(tableName)).getOrElse(false)) {
+        logger.info(s"No table to truncate ${tableNameStr}")
+        return
+      }
 
-    Try(admin.isTableDisabled(tableName)).map {
-      case true =>
-        logger.info(s"${tableNameStr} is already disabled.")
+      Try(admin.isTableDisabled(tableName)).map {
+        case true =>
+          logger.info(s"${tableNameStr} is already disabled.")
 
-      case false =>
-        logger.info(s"Before disabling to trucate ${tableNameStr}")
-        Try(admin.disableTable(tableName)).recover {
-          case NonFatal(e) =>
-            logger.info(s"Failed to disable ${tableNameStr}: ${e}")
-        }
-        logger.info(s"After disabling to trucate ${tableNameStr}")
-    }
+        case false =>
+          logger.info(s"Before disabling to trucate ${tableNameStr}")
+          Try(admin.disableTable(tableName)).recover {
+            case NonFatal(e) =>
+              logger.info(s"Failed to disable ${tableNameStr}: ${e}")
+          }
+          logger.info(s"After disabling to trucate ${tableNameStr}")
+      }
 
-    logger.info(s"Before truncating ${tableNameStr}")
-    Try(admin.truncateTable(tableName, true)).recover {
-      case NonFatal(e) =>
-        logger.info(s"Failed to truncate ${tableNameStr}: ${e}")
-    }
-    logger.info(s"After truncating ${tableNameStr}")
-    Try(admin.close()).recover {
-      case NonFatal(e) =>
-        logger.info(s"Failed to close admin ${tableNameStr}: ${e}")
-    }
-    Try(admin.getConnection.close()).recover {
-      case NonFatal(e) =>
-        logger.info(s"Failed to close connection ${tableNameStr}: ${e}")
+      logger.info(s"Before truncating ${tableNameStr}")
+      Try(admin.truncateTable(tableName, true)).recover {
+        case NonFatal(e) =>
+          logger.info(s"Failed to truncate ${tableNameStr}: ${e}")
+      }
+      logger.info(s"After truncating ${tableNameStr}")
+      Try(admin.close()).recover {
+        case NonFatal(e) =>
+          logger.info(s"Failed to close admin ${tableNameStr}: ${e}")
+      }
+      Try(admin.getConnection.close()).recover {
+        case NonFatal(e) =>
+          logger.info(s"Failed to close connection ${tableNameStr}: ${e}")
+      }
     }
-
   }
 
   override def deleteTable(zkAddr: String, tableNameStr: String): Unit = {
-    val admin = getAdmin(zkAddr)
-    val tableName = TableName.valueOf(tableNameStr)
-    if (!admin.tableExists(tableName)) {
-      return
-    }
-    if (admin.isTableEnabled(tableName)) {
-      admin.disableTable(tableName)
+    withAdmin(zkAddr) { admin =>
+      val tableName = TableName.valueOf(tableNameStr)
+      if (!admin.tableExists(tableName)) {
+        return
+      }
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName)
+      }
+      admin.deleteTable(tableName)
     }
-    admin.deleteTable(tableName)
-    admin.close()
   }
 
   /** Asynchbase implementation override default getVertices to use future 
Cache */
@@ -827,6 +822,15 @@ class AsynchbaseStorage(override val graph: S2Graph,
     conn.getAdmin
   }
 
+  private def withAdmin(zkAddr: String)(op: Admin => Unit): Unit = {
+    val admin = getAdmin(zkAddr)
+    try {
+      op(admin)
+    } finally {
+      admin.close()
+      admin.getConnection.close()
+    }
+  }
   /**
    * following configuration need to come together to use secured hbase 
cluster.
    * 1. set hbase.security.auth.enable = true
@@ -850,16 +854,22 @@ class AsynchbaseStorage(override val graph: S2Graph,
   }
 
   private def enableTable(zkAddr: String, tableName: String) = {
-    getAdmin(zkAddr).enableTable(TableName.valueOf(tableName))
+    withAdmin(zkAddr) { admin =>
+      admin.enableTable(TableName.valueOf(tableName))
+    }
   }
 
   private def disableTable(zkAddr: String, tableName: String) = {
-    getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
+    withAdmin(zkAddr) { admin =>
+      admin.disableTable(TableName.valueOf(tableName))
+    }
   }
 
   private def dropTable(zkAddr: String, tableName: String) = {
-    getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
-    getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName))
+    withAdmin(zkAddr) { admin =>
+      admin.disableTable(TableName.valueOf(tableName))
+      admin.deleteTable(TableName.valueOf(tableName))
+    }
   }
 
   private def getStartKey(regionCount: Int): Array[Byte] = {

Reply via email to