release HBaseAdmin after using it.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3407a81e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3407a81e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3407a81e Branch: refs/heads/master Commit: 3407a81ed9ea1468ab0359866d5e921ece3db8c7 Parents: 549279d Author: DO YUNG YOON <[email protected]> Authored: Thu May 4 10:48:06 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu May 4 10:48:06 2017 +0900 ---------------------------------------------------------------------- .../core/storage/hbase/AsynchbaseStorage.scala | 170 ++++++++++--------- 1 file changed, 90 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3407a81e/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index e41fe27..5c9695d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -29,7 +29,7 @@ import com.stumbleupon.async.{Callback, Deferred} import com.typesafe.config.Config import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability} +import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability} import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.regionserver.BloomType @@ -560,97 +560,92 @@ class AsynchbaseStorage(override val graph: S2Graph, zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq } { logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") - val admin = getAdmin(zkAddr) - val regionCount = totalRegionCount.getOrElse(admin.getClusterStatus.getServersSize * regionMultiplier) - try { - if (!admin.tableExists(TableName.valueOf(tableName))) { - val desc = new HTableDescriptor(TableName.valueOf(tableName)) - desc.setDurability(Durability.ASYNC_WAL) - for (cf <- cfs) { - val columnDesc = new HColumnDescriptor(cf) - .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) - .setBloomFilterType(BloomType.ROW) - .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) - .setMaxVersions(1) - .setTimeToLive(2147483647) - .setMinVersions(0) - .setBlocksize(32768) - .setBlockCacheEnabled(true) + withAdmin(zkAddr) { admin => + val regionCount = totalRegionCount.getOrElse(admin.getClusterStatus.getServersSize * regionMultiplier) + try { + if (!admin.tableExists(TableName.valueOf(tableName))) { + val desc = new HTableDescriptor(TableName.valueOf(tableName)) + desc.setDurability(Durability.ASYNC_WAL) + for (cf <- cfs) { + val columnDesc = new HColumnDescriptor(cf) + .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) + .setBloomFilterType(BloomType.ROW) + .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) + .setMaxVersions(1) + .setTimeToLive(2147483647) + .setMinVersions(0) + .setBlocksize(32768) + .setBlockCacheEnabled(true) // FIXME: For test!! - .setInMemory(true) - if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) - if (replicationScopeOpt.isDefined) columnDesc.setScope(replicationScopeOpt.get) - desc.addFamily(columnDesc) - } + .setInMemory(true) + if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) + if (replicationScopeOpt.isDefined) columnDesc.setScope(replicationScopeOpt.get) + desc.addFamily(columnDesc) + } - if (regionCount <= 1) admin.createTable(desc) - else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) - } else { - logger.info(s"$zkAddr, $tableName, $cfs already exist.") + if (regionCount <= 1) admin.createTable(desc) + else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) + } else { + logger.info(s"$zkAddr, $tableName, $cfs already exist.") + } + } catch { + case e: Throwable => + logger.error(s"$zkAddr, $tableName failed with $e", e) + throw e } - } catch { - case e: Throwable => - logger.error(s"$zkAddr, $tableName failed with $e", e) - throw e - } finally { - admin.close() - admin.getConnection.close() } } } override def truncateTable(zkAddr: String, tableNameStr: String): Unit = { - val tableName = TableName.valueOf(tableNameStr) - val adminTry = Try(getAdmin(zkAddr)) - if (adminTry.isFailure) return - val admin = adminTry.get - - if (!Try(admin.tableExists(tableName)).getOrElse(false)) { - logger.info(s"No table to truncate ${tableNameStr}") - return - } + withAdmin(zkAddr) { admin => + val tableName = TableName.valueOf(tableNameStr) + if (!Try(admin.tableExists(tableName)).getOrElse(false)) { + logger.info(s"No table to truncate ${tableNameStr}") + return + } - Try(admin.isTableDisabled(tableName)).map { - case true => - logger.info(s"${tableNameStr} is already disabled.") + Try(admin.isTableDisabled(tableName)).map { + case true => + logger.info(s"${tableNameStr} is already disabled.") - case false => - logger.info(s"Before disabling to trucate ${tableNameStr}") - Try(admin.disableTable(tableName)).recover { - case NonFatal(e) => - logger.info(s"Failed to disable ${tableNameStr}: ${e}") - } - logger.info(s"After disabling to trucate ${tableNameStr}") - } + case false => + logger.info(s"Before disabling to trucate ${tableNameStr}") + Try(admin.disableTable(tableName)).recover { + case NonFatal(e) => + logger.info(s"Failed to disable ${tableNameStr}: ${e}") + } + logger.info(s"After disabling to trucate ${tableNameStr}") + } - logger.info(s"Before truncating ${tableNameStr}") - Try(admin.truncateTable(tableName, true)).recover { - case NonFatal(e) => - logger.info(s"Failed to truncate ${tableNameStr}: ${e}") - } - logger.info(s"After truncating ${tableNameStr}") - Try(admin.close()).recover { - case NonFatal(e) => - logger.info(s"Failed to close admin ${tableNameStr}: ${e}") - } - Try(admin.getConnection.close()).recover { - case NonFatal(e) => - logger.info(s"Failed to close connection ${tableNameStr}: ${e}") + logger.info(s"Before truncating ${tableNameStr}") + Try(admin.truncateTable(tableName, true)).recover { + case NonFatal(e) => + logger.info(s"Failed to truncate ${tableNameStr}: ${e}") + } + logger.info(s"After truncating ${tableNameStr}") + Try(admin.close()).recover { + case NonFatal(e) => + logger.info(s"Failed to close admin ${tableNameStr}: ${e}") + } + Try(admin.getConnection.close()).recover { + case NonFatal(e) => + logger.info(s"Failed to close connection ${tableNameStr}: ${e}") + } } - } override def deleteTable(zkAddr: String, tableNameStr: String): Unit = { - val admin = getAdmin(zkAddr) - val tableName = TableName.valueOf(tableNameStr) - if (!admin.tableExists(tableName)) { - return - } - if (admin.isTableEnabled(tableName)) { - admin.disableTable(tableName) + withAdmin(zkAddr) { admin => + val tableName = TableName.valueOf(tableNameStr) + if (!admin.tableExists(tableName)) { + return + } + if (admin.isTableEnabled(tableName)) { + admin.disableTable(tableName) + } + admin.deleteTable(tableName) } - admin.deleteTable(tableName) - admin.close() } /** Asynchbase implementation override default getVertices to use future Cache */ @@ -827,6 +822,15 @@ class AsynchbaseStorage(override val graph: S2Graph, conn.getAdmin } + private def withAdmin(zkAddr: String)(op: Admin => Unit): Unit = { + val admin = getAdmin(zkAddr) + try { + op(admin) + } finally { + admin.close() + admin.getConnection.close() + } + } /** * following configuration need to come together to use secured hbase cluster. * 1. set hbase.security.auth.enable = true @@ -850,16 +854,22 @@ class AsynchbaseStorage(override val graph: S2Graph, } private def enableTable(zkAddr: String, tableName: String) = { - getAdmin(zkAddr).enableTable(TableName.valueOf(tableName)) + withAdmin(zkAddr) { admin => + admin.enableTable(TableName.valueOf(tableName)) + } } private def disableTable(zkAddr: String, tableName: String) = { - getAdmin(zkAddr).disableTable(TableName.valueOf(tableName)) + withAdmin(zkAddr) { admin => + admin.disableTable(TableName.valueOf(tableName)) + } } private def dropTable(zkAddr: String, tableName: String) = { - getAdmin(zkAddr).disableTable(TableName.valueOf(tableName)) - getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName)) + withAdmin(zkAddr) { admin => + admin.disableTable(TableName.valueOf(tableName)) + admin.deleteTable(TableName.valueOf(tableName)) + } } private def getStartKey(regionCount: Int): Array[Byte] = {
