Repository: incubator-s2graph Updated Branches: refs/heads/master 6cfbf1d2b -> 66be5c8c3
[S2GRAPH-14] Abstract HBase specific methods in Management and Label Remove Management.createService from Label/Service. Add Management class and move createService/createLabel/createTable from Management object into Management class. Refactor caller of above methods according to changes. JIRA: [S2GRAPH-14] https://issues.apache.org/jira/browse/S2GRAPH-14 Pull Request: Closes #2 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/66be5c8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/66be5c8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/66be5c8c Branch: refs/heads/master Commit: 66be5c8c31e89e9131e88a0e0e8a8e2e2a1b57bc Parents: 6cfbf1d Author: DO YUNG YOON <[email protected]> Authored: Fri Jan 15 17:10:51 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Jan 15 17:10:51 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../com/kakao/s2graph/core/Management.scala | 294 ++++++------------- .../com/kakao/s2graph/core/mysqls/Label.scala | 18 +- .../com/kakao/s2graph/core/mysqls/Service.scala | 6 +- .../kakao/s2graph/core/storage/Storage.scala | 14 +- .../core/storage/hbase/AsynchbaseStorage.scala | 78 ++++- .../core/Integrate/IntegrateCommon.scala | 6 +- .../s2graph/core/TestCommonWithModels.scala | 14 +- .../s2graph/core/parsers/WhereParserTest.scala | 2 + .../s2/counter/core/v1/ExactStorageHBase.scala | 8 +- .../src/main/scala/s2/helper/CounterAdmin.scala | 5 +- .../s2/counter/core/RankingCounterSpec.scala | 5 +- s2rest_play/app/Bootstrap.scala | 4 +- .../app/controllers/AdminController.scala | 13 +- 14 files changed, 227 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 4ca49c6..188d194 100644 --- a/CHANGES +++ b/CHANGES @@ -6,6 +6,8 @@ Release 0.12.1 - unreleased IMPROVEMENT + S2GRAPH-14: Abstract HBase specific methods in Management and Label (Committed by DOYUNG YOON). + S2GRAPH-24: Add counter config for readonly graph (Committed by Jaesang Kim). BUG FIXES http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala index 5258a3a..32f7a5a 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala @@ -4,18 +4,11 @@ package com.kakao.s2graph.core import com.kakao.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException} import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop} import com.kakao.s2graph.core.mysqls._ +import com.kakao.s2graph.core.storage.Storage +import com.kakao.s2graph.core.types.HBaseType._ import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.utils.logger -import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability} -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.regionserver.BloomType -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} -import play.Play import play.api.libs.json.Reads._ import play.api.libs.json._ - import scala.util.Try /** @@ -36,20 +29,8 @@ object Management extends JSONParser { import HBaseType._ - val hardLimit = 10000 - val defaultLimit = 100 - val defaultCompressionAlgorithm = "gz" -// Play.application().configuration().getString("hbase.table.compression.algorithm") + val DefaultCompressionAlgorithm = "gz" - def createService(serviceName: String, - cluster: String, hTableName: String, - preSplitSize: Int, hTableTTL: Option[Int], - compressionAlgorithm: String): Try[Service] = { - - Model withTx { implicit session => - Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) - } - } def findService(serviceName: String) = { Service.findByName(serviceName, useCache = false) @@ -68,62 +49,7 @@ object Management extends JSONParser { Label.updateHTableName(targetLabel.label, newHTableName) } - /** - * label - */ - /** - * copy label when if oldLabel exist and newLabel do not exist. - * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster. - */ - def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = { - val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists.")) - if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.") - - val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) } - val allIndices = old.indices.map { index => Index(index.name, index.propNames) } - - createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType, - old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType, - old.isDirected, old.serviceName, - allIndices, allProps, - old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm) - } - - def createLabel(label: String, - srcServiceName: String, - srcColumnName: String, - srcColumnType: String, - tgtServiceName: String, - tgtColumnName: String, - tgtColumnType: String, - isDirected: Boolean = true, - serviceName: String, - indices: Seq[Index], - props: Seq[Prop], - consistencyLevel: String, - hTableName: Option[String], - hTableTTL: Option[Int], - schemaVersion: String = DEFAULT_VERSION, - isAsync: Boolean, - compressionAlgorithm: String = defaultCompressionAlgorithm): Try[Label] = { - - val labelOpt = Label.findByName(label, useCache = false) - - Model withTx { implicit session => - labelOpt match { - case Some(l) => - throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.") - case None => - Label.insertAll(label, - srcServiceName, srcColumnName, srcColumnType, - tgtServiceName, tgtColumnName, tgtColumnType, - isDirected, serviceName, indices, props, consistencyLevel, - hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm) - Label.findByName(label, useCache = false).get - } - } - } def createServiceColumn(serviceName: String, columnName: String, @@ -326,121 +252,6 @@ object Management extends JSONParser { props } - val idTableName = "id" - val cf = "a" - val idColName = "id" - val regionCnt = 10 - - def getAdmin(zkAddr: String) = { - val conf = HBaseConfiguration.create() - conf.set("hbase.zookeeper.quorum", zkAddr) - val conn = ConnectionFactory.createConnection(conf) - conn.getAdmin - } - - def enableTable(zkAddr: String, tableName: String) = { - getAdmin(zkAddr).enableTable(TableName.valueOf(tableName)) - } - - def disableTable(zkAddr: String, tableName: String) = { - getAdmin(zkAddr).disableTable(TableName.valueOf(tableName)) - } - - def dropTable(zkAddr: String, tableName: String) = { - getAdmin(zkAddr).disableTable(TableName.valueOf(tableName)) - getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName)) - } - - // def deleteEdgesByLabelIds(zkAddr: String, - // tableName: String, - // labelIds: String = "", - // minTs: Long = 0L, - // maxTs: Long = Long.MaxValue, - // include: Boolean = true) = { - // val conf = HBaseConfiguration.create() - // val longTimeout = "1200000" - // conf.set("hbase.rpc.timeout", longTimeout) - // conf.set("hbase.client.operation.timeout", longTimeout) - // conf.set("hbase.client.scanner.timeout.period", longTimeout) - // conf.set("hbase.zookeeper.quorum", zkAddr) - // val conn = HConnectionManager.createConnection(conf) - // val table = conn.getTable(tableName.getBytes) - // var builder = DeleteLabelsArgument.newBuilder() - // val scanner = Scan.newBuilder() - // - // scanner.setTimeRange(TimeRange.newBuilder().setFrom(minTs).setTo(maxTs)) - // /** - // * when we clean up all data does not match current database ids - // * we will delete row completely - // */ - // if (!include) scanner.setFilter(ProtobufUtil.toFilter(new FirstKeyOnlyFilter)) - // - // builder.setScan(scanner) - // for (id <- labelIds.split(",")) { - // builder.addId(id.toInt) - // } - // - // val argument = builder.build() - // - // val regionStats = table.coprocessorService(classOf[GraphStatService], null, null, - // new Batch.Call[GraphStatService, Long]() { - // override def call(counter: GraphStatService): Long = { - // val controller: ServerRpcController = new ServerRpcController() - // val rpcCallback: BlockingRpcCallback[CountResponse] = new BlockingRpcCallback[CountResponse]() - // - // if (include) { - // counter.cleanUpDeleteLabelsRows(controller, argument, rpcCallback) - // } else { - // counter.cleanUpDeleteLabelsRowsExclude(controller, argument, rpcCallback) - // } - // - // val response: CountResponse = rpcCallback.get() - // if (controller.failedOnException()) throw controller.getFailedOn() - // if (response != null && response.hasCount()) { - // response.getCount() - // } else { - // 0L - // } - // } - // }) - // - // // regionStats.map(kv => Bytes.toString(kv._1) -> kv._2) ++ Map("total" -> regionStats.values().sum) - // } - - def createTable(zkAddr: String, tableName: String, cfs: List[String], regionMultiplier: Int, ttl: Option[Int], - compressionAlgorithm: String = defaultCompressionAlgorithm) = { - logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") - val admin = getAdmin(zkAddr) - val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier - if (!admin.tableExists(TableName.valueOf(tableName))) { - try { - val desc = new HTableDescriptor(TableName.valueOf(tableName)) - desc.setDurability(Durability.ASYNC_WAL) - for (cf <- cfs) { - val columnDesc = new HColumnDescriptor(cf) - .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) - .setBloomFilterType(BloomType.ROW) - .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) - .setMaxVersions(1) - .setTimeToLive(2147483647) - .setMinVersions(0) - .setBlocksize(32768) - .setBlockCacheEnabled(true) - if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) - desc.addFamily(columnDesc) - } - - if (regionCount <= 1) admin.createTable(desc) - else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) - } catch { - case e: Throwable => - logger.error(s"$zkAddr, $tableName failed with $e", e) - throw e - } - } else { - logger.info(s"$zkAddr, $tableName, $cf already exist.") - } - } /** * update label name. @@ -457,15 +268,102 @@ object Management extends JSONParser { } } } +} + +class Management(graph: Graph) { + import Management._ + val storage = graph.storage - // we only use murmur hash to distribute row key. - def getStartKey(regionCount: Int): Array[Byte] = { - Bytes.toBytes((Int.MaxValue / regionCount)) + def createTable(zkAddr: String, + tableName: String, + cfs: List[String], + regionMultiplier: Int, + ttl: Option[Int], + compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit = + storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm) + + /** HBase specific code */ + def createService(serviceName: String, + cluster: String, hTableName: String, + preSplitSize: Int, hTableTTL: Option[Int], + compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = { + + Model withTx { implicit session => + val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) + /** create hbase table for service */ + storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm) + service + } } - def getEndKey(regionCount: Int): Array[Byte] = { - Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) + /** HBase specific code */ + def createLabel(label: String, + srcServiceName: String, + srcColumnName: String, + srcColumnType: String, + tgtServiceName: String, + tgtColumnName: String, + tgtColumnType: String, + isDirected: Boolean = true, + serviceName: String, + indices: Seq[Index], + props: Seq[Prop], + consistencyLevel: String, + hTableName: Option[String], + hTableTTL: Option[Int], + schemaVersion: String = DEFAULT_VERSION, + isAsync: Boolean, + compressionAlgorithm: String = "gz"): Try[Label] = { + + val labelOpt = Label.findByName(label, useCache = false) + + Model withTx { implicit session => + labelOpt match { + case Some(l) => + throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.") + case None => + /** create all models */ + val newLabel = Label.insertAll(label, + srcServiceName, srcColumnName, srcColumnType, + tgtServiceName, tgtColumnName, tgtColumnType, + isDirected, serviceName, indices, props, consistencyLevel, + hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm) + + /** create hbase table */ + val service = newLabel.service + (hTableName, hTableTTL) match { + case (None, None) => // do nothing + case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also") + case (Some(hbaseTableName), None) => + // create own hbase table with default ttl on service level. + storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm) + case (Some(hbaseTableName), Some(hbaseTableTTL)) => + // create own hbase table with own ttl. + storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm) + } + newLabel + } + } } + /** + * label + */ + /** + * copy label when if oldLabel exist and newLabel do not exist. + * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster. + */ + def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = { + val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists.")) + if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.") + + val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) } + val allIndices = old.indices.map { index => Index(index.name, index.propNames) } -} + createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType, + old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType, + old.isDirected, old.serviceName, + allIndices, allProps, + old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala index 1e49abd..005a01e 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala @@ -1,9 +1,5 @@ package com.kakao.s2graph.core.mysqls -/** - * Created by shon on 6/3/15. - */ - import com.kakao.s2graph.core.GraphExceptions.ModelNotFoundException import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop} import com.kakao.s2graph.core.utils.logger @@ -141,7 +137,7 @@ object Label extends Model[Label] { hTableTTL: Option[Int], schemaVersion: String, isAsync: Boolean, - compressionAlgorithm: String)(implicit session: DBSession = AutoSession) = { + compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Label = { val srcServiceOpt = Service.findByName(srcServiceName, useCache = false) val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false) @@ -188,18 +184,6 @@ object Label extends Model[Label] { } } - /** TODO: */ - (hTableName, hTableTTL) match { - case (None, None) => // do nothing - case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also") - case (Some(hbaseTableName), None) => - // create own hbase table with default ttl on service level. - Management.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm) - case (Some(hbaseTableName), Some(hbaseTableTTL)) => - // create own hbase table with own ttl. - Management.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm) - } - val cacheKeys = List(s"id=$createdId", s"label=$labelName") val ret = findByName(labelName, useCache = false).get putsToCache(cacheKeys.map(k => k -> ret)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala index edd36df..2840db8 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala @@ -1,8 +1,5 @@ package com.kakao.s2graph.core.mysqls -/** - * Created by shon on 6/3/15. - */ import java.util.UUID @@ -39,12 +36,11 @@ object Service extends Model[Service] { def insert(serviceName: String, cluster: String, hTableName: String, preSplitSize: Int, hTableTTL: Option[Int], - compressionAlgorithm: String)(implicit session: DBSession = AutoSession) = { + compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Unit = { logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, $hTableTTL, $compressionAlgorithm") val accessToken = UUID.randomUUID().toString() sql"""insert into services(service_name, access_token, cluster, hbase_table_name, pre_split_size, hbase_table_ttl) values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, ${preSplitSize}, ${hTableTTL})""".execute.apply() - Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm) } def delete(id: Int)(implicit session: DBSession = AutoSession) = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala index 79303a5..bff0f3b 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala @@ -2,14 +2,16 @@ package com.kakao.s2graph.core.storage import com.google.common.cache.Cache import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.Label +import com.kakao.s2graph.core.mysqls.{Service, Label} import com.kakao.s2graph.core.utils.logger +import com.typesafe.config.Config import scala.collection.Seq import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try -abstract class Storage(implicit ec: ExecutionContext) { +abstract class Storage(val config: Config)(implicit ec: ExecutionContext) { def cacheOpt: Option[Cache[Integer, Seq[QueryResult]]] @@ -67,6 +69,12 @@ abstract class Storage(implicit ec: ExecutionContext) { def flush(): Unit + def createTable(zkAddr: String, + tableName: String, + cfs: List[String], + regionMultiplier: Int, + ttl: Option[Int], + compressionAlgorithm: String): Unit def toEdge[K: CanSKeyValue](kv: K, queryParam: QueryParam, @@ -133,5 +141,7 @@ abstract class Storage(implicit ec: ExecutionContext) { } } } + + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 74f580e..3ccb473 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -11,6 +11,12 @@ import com.kakao.s2graph.core.types._ import com.kakao.s2graph.core.utils.{Extensions, logger} import com.stumbleupon.async.Deferred import com.typesafe.config.Config +import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability} +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding +import org.apache.hadoop.hbase.regionserver.BloomType +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} import org.hbase.async._ import scala.collection.JavaConversions._ @@ -44,8 +50,8 @@ object AsynchbaseStorage { } } -class AsynchbaseStorage(val config: Config, vertexCache: Cache[Integer, Option[Vertex]]) - (implicit ec: ExecutionContext) extends Storage { +class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, Option[Vertex]]) + (implicit ec: ExecutionContext) extends Storage(config) { import AsynchbaseStorage._ @@ -737,4 +743,72 @@ class AsynchbaseStorage(val config: Config, vertexCache: Cache[Integer, Option[V Await.result(client.flush().toFuture, timeout) } + + def createTable(zkAddr: String, + tableName: String, + cfs: List[String], + regionMultiplier: Int, + ttl: Option[Int], + compressionAlgorithm: String): Unit = { + logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") + val admin = getAdmin(zkAddr) + val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier + if (!admin.tableExists(TableName.valueOf(tableName))) { + try { + val desc = new HTableDescriptor(TableName.valueOf(tableName)) + desc.setDurability(Durability.ASYNC_WAL) + for (cf <- cfs) { + val columnDesc = new HColumnDescriptor(cf) + .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) + .setBloomFilterType(BloomType.ROW) + .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) + .setMaxVersions(1) + .setTimeToLive(2147483647) + .setMinVersions(0) + .setBlocksize(32768) + .setBlockCacheEnabled(true) + if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) + desc.addFamily(columnDesc) + } + + if (regionCount <= 1) admin.createTable(desc) + else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) + } catch { + case e: Throwable => + logger.error(s"$zkAddr, $tableName failed with $e", e) + throw e + } + } else { + logger.info(s"$zkAddr, $tableName, $cfs already exist.") + } + } + + + private def getAdmin(zkAddr: String) = { + val conf = HBaseConfiguration.create() + conf.set("hbase.zookeeper.quorum", zkAddr) + val conn = ConnectionFactory.createConnection(conf) + conn.getAdmin + } + private def enableTable(zkAddr: String, tableName: String) = { + getAdmin(zkAddr).enableTable(TableName.valueOf(tableName)) + } + + private def disableTable(zkAddr: String, tableName: String) = { + getAdmin(zkAddr).disableTable(TableName.valueOf(tableName)) + } + + private def dropTable(zkAddr: String, tableName: String) = { + getAdmin(zkAddr).disableTable(TableName.valueOf(tableName)) + getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName)) + } + + private def getStartKey(regionCount: Int): Array[Byte] = { + Bytes.toBytes((Int.MaxValue / regionCount)) + } + + private def getEndKey(regionCount: Int): Array[Byte] = { + Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1))) + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala index 0ced48f..e60b824 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala @@ -17,11 +17,13 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { var graph: Graph = _ var parser: RequestParser = _ + var management: Management = _ var config: Config = _ override def beforeAll = { config = ConfigFactory.load() graph = new Graph(config)(ExecutionContext.Implicits.global) + management = new Management(graph) parser = new RequestParser(graph.config) initTestData() } @@ -43,7 +45,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { parser.toServiceElements(jsValue) val tryRes = - Management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) + management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) println(s">> Service created : $createService, $tryRes") val labelNames = Map(testLabelName -> testLabelNameCreate, @@ -60,7 +62,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { val json = Json.parse(create) val tryRes = for { labelArgs <- parser.toLabelElements(json) - label <- (Management.createLabel _).tupled(labelArgs) + label <- (management.createLabel _).tupled(labelArgs) } yield label tryRes.get http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala index e2a3363..4fa7b59 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala @@ -18,10 +18,12 @@ trait TestCommonWithModels { var graph: Graph = _ var config: Config = _ + var management: Management = _ def initTests() = { config = ConfigFactory.load() graph = new Graph(config)(ExecutionContext.Implicits.global) + management = new Management(graph) implicit val session = AutoSession @@ -75,8 +77,8 @@ trait TestCommonWithModels { def createTestService() = { implicit val session = AutoSession - Management.createService(serviceName, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") - Management.createService(serviceNameV2, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") + management.createService(serviceName, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") + management.createService(serviceNameV2, cluster, hTableName, preSplitSize, hTableTTL = None, "gz") } def deleteTestService() = { @@ -95,16 +97,16 @@ trait TestCommonWithModels { def createTestLabel() = { implicit val session = AutoSession - Management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType, + management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType, isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4") - Management.createLabel(labelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2, + management.createLabel(labelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2, isDirected = true, serviceNameV2, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4") - Management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType, + management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType, isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4") - Management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2, + management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2, isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala index 393d8b1..659983c 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala @@ -8,6 +8,8 @@ import play.api.libs.json.Json class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels { // dummy data for dummy edge + initTests() + import HBaseType.{VERSION1, VERSION2} val ts = System.currentTimeMillis() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala b/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala index b946f0a..a664de4 100644 --- a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala +++ b/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala @@ -1,5 +1,6 @@ package s2.counter.core.v1 +import com.kakao.s2graph.core.Graph import com.typesafe.config.Config import org.apache.hadoop.hbase.CellUtil import org.apache.hadoop.hbase.client._ @@ -17,9 +18,7 @@ import scala.collection.JavaConversions._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} -/** - * Created by hsleep([email protected]) on 2015. 10. 1.. - */ + class ExactStorageHBase(config: Config) extends ExactStorage { import ExactStorageHBase._ @@ -30,6 +29,8 @@ class ExactStorageHBase(config: Config) extends ExactStorage { private[counter] val withHBase = new WithHBase(config) private[counter] val hbaseManagement = new Management(config) + + private def getTableName(policy: Counter): String = { policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME) } @@ -280,6 +281,7 @@ class ExactStorageHBase(config: Config) extends ExactStorage { override def prepare(policy: Counter): Unit = { // create hbase table policy.hbaseTable.foreach { table => + if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)) { hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table, ColumnFamily.values.map(_.toString).toList, 1) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala b/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala index 8547bc8..3cf9181 100644 --- a/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala +++ b/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala @@ -1,5 +1,6 @@ package s2.helper +import com.kakao.s2graph.core.Graph import com.kakao.s2graph.core.mysqls.Label import com.typesafe.config.Config import play.api.libs.json.Json @@ -18,11 +19,13 @@ class CounterAdmin(config: Config) { val s2config = new S2CounterConfig(config) val counterModel = new CounterModel(config) val graphOp = new GraphOperation(config) + val s2graph = new Graph(config)(scala.concurrent.ExecutionContext.global) + val storageManagement = new com.kakao.s2graph.core.Management(s2graph) def setupCounterOnGraph(): Unit = { // create s2counter service val service = "s2counter" - com.kakao.s2graph.core.Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"$service-${config.getString("phase")}", 1, None, "gz") + storageManagement.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"$service-${config.getString("phase")}", 1, None, "gz") // create bucket label val label = "s2counter_topK_bucket" if (Label.findByName(label, useCache = false).isEmpty) { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala b/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala index dd6a6bf..ea67a2a 100644 --- a/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala +++ b/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala @@ -1,5 +1,6 @@ package s2.counter.core +import com.kakao.s2graph.core.{Management, Graph} import com.kakao.s2graph.core.mysqls.Label import com.typesafe.config.ConfigFactory import org.specs2.mutable.Specification @@ -70,7 +71,9 @@ class RankingCounterSpec extends Specification with BeforeAfterAll { } val graphOp = new GraphOperation(config) - com.kakao.s2graph.core.Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz") + val graph = new Graph(config)(scala.concurrent.ExecutionContext.global) + val management = new Management(graph) + management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz") val strJs = s""" |{ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2rest_play/app/Bootstrap.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/Bootstrap.scala b/s2rest_play/app/Bootstrap.scala index a661ac3..a1acb10 100644 --- a/s2rest_play/app/Bootstrap.scala +++ b/s2rest_play/app/Bootstrap.scala @@ -5,7 +5,7 @@ import java.util.concurrent.Executors import actors.QueueActor import com.kakao.s2graph.core.rest.RequestParser import com.kakao.s2graph.core.utils.logger -import com.kakao.s2graph.core.{ExceptionHandler, Graph} +import com.kakao.s2graph.core.{Management, ExceptionHandler, Graph} import config.Config import controllers.{AdminController, ApplicationController} import play.api.Application @@ -18,6 +18,7 @@ import scala.util.Try object Global extends WithFilters(new GzipFilter()) { var s2graph: Graph = _ + var storageManagement: Management = _ var s2parser: RequestParser = _ // Application entry point @@ -32,6 +33,7 @@ object Global extends WithFilters(new GzipFilter()) { // init s2graph with config s2graph = new Graph(config)(ec) + storageManagement = new Management(s2graph) s2parser = new RequestParser(s2graph.config) // merged config QueueActor.init(s2graph) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2rest_play/app/controllers/AdminController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/AdminController.scala b/s2rest_play/app/controllers/AdminController.scala index 2eb248b..bb73c40 100644 --- a/s2rest_play/app/controllers/AdminController.scala +++ b/s2rest_play/app/controllers/AdminController.scala @@ -14,6 +14,7 @@ import scala.util.{Failure, Success, Try} object AdminController extends Controller { import ApplicationController._ + private val management: Management = com.kakao.s2graph.rest.Global.storageManagement private val requestParser: RequestParser = com.kakao.s2graph.rest.Global.s2parser /** @@ -181,9 +182,10 @@ object AdminController extends Controller { tryResponse(serviceTry)(_.toJson) } + def createServiceInner(jsValue: JsValue) = { val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = requestParser.toServiceElements(jsValue) - Management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) + management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) } /** @@ -195,9 +197,10 @@ object AdminController extends Controller { tryResponse(ret)(_.toJson) } + def createLabelInner(json: JsValue) = for { labelArgs <- requestParser.toLabelElements(json) - label <- (Management.createLabel _).tupled(labelArgs) + label <- (management.createLabel _).tupled(labelArgs) } yield label /** @@ -322,7 +325,7 @@ object AdminController extends Controller { * @return */ def copyLabel(oldLabelName: String, newLabelName: String) = Action { request => - val copyTry = Management.copyLabel(oldLabelName, newLabelName, Some(newLabelName)) + val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName)) tryResponse(copyTry)(_.label + "created") } @@ -408,7 +411,9 @@ object AdminController extends Controller { // Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm) request.body.asJson.map(_.validate[HTableParams] match { case JsSuccess(hTableParams, _) => { - Management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), hTableParams.preSplitSize, hTableParams.hTableTTL, hTableParams.compressionAlgorithm.getOrElse(Management.defaultCompressionAlgorithm)) + management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), + hTableParams.preSplitSize, hTableParams.hTableTTL, + hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm)) logger.info(hTableParams.toString()) ok(s"HTable was created.") }
