http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala b/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala deleted file mode 100644 index a664de4..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala +++ /dev/null @@ -1,327 +0,0 @@ -package s2.counter.core.v1 - -import com.kakao.s2graph.core.Graph -import com.typesafe.config.Config -import org.apache.hadoop.hbase.CellUtil -import org.apache.hadoop.hbase.client._ -import org.apache.hadoop.hbase.filter.{ColumnRangeFilter, FilterList} -import org.apache.hadoop.hbase.util.Bytes -import org.slf4j.LoggerFactory -import s2.config.S2CounterConfig -import s2.counter.core.ExactCounter.ExactValueMap -import s2.counter.core._ -import s2.helper.{Management, WithHBase} -import s2.models.Counter -import s2.models.Counter.ItemType - -import scala.collection.JavaConversions._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - - -class ExactStorageHBase(config: Config) extends ExactStorage { - import ExactStorageHBase._ - - private val log = LoggerFactory.getLogger(getClass) - - lazy val s2config = new S2CounterConfig(config) - - private[counter] val withHBase = new WithHBase(config) - private[counter] val hbaseManagement = new Management(config) - - - - private def getTableName(policy: Counter): String = { - policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME) - } - - override def get(policy: Counter, - items: Seq[String], - timeRange: Seq[(TimedQualifier, TimedQualifier)]) - (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] = { - lazy val messageForLog = s"${policy.service}.${policy.action} $items $timeRange" - - val keys = { - for { - item <- items - } yield { - ExactKey(policy, item, checkItemType = true) - } - } - - val gets = { - for { - key <- keys - } yield { - val get = new Get(BytesUtilV1.toBytes(key)) - timeRange.map(t => intervalsMap(t._1.q)).distinct.foreach { cf => - get.addFamily(cf.toString.getBytes) - } - get.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE, { - for { - (from, to) <- timeRange - } yield { - new ColumnRangeFilter( - BytesUtilV1.toBytes(from), true, - BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false) - } - })) - } - } - - // println(s"$messageForLog $gets") - Future { - withHBase(getTableName(policy)) { table => - for { - (rst, key) <- table.get(gets).zip(keys) if !rst.isEmpty - } yield { - val qualifierWithCounts = { - for { - cell <- rst.listCells() - eq = BytesUtilV1.toExactQualifier(CellUtil.cloneQualifier(cell)) - } yield { - eq -> Bytes.toLong(CellUtil.cloneValue(cell)) - } - }.toMap - FetchedCounts(key, qualifierWithCounts) - } - } match { - case Success(rst) => rst - case Failure(ex) => - log.error(s"$ex: $messageForLog") - Nil - } - } - } - - override def get(policy: Counter, - items: Seq[String], - timeRange: Seq[(TimedQualifier, TimedQualifier)], - dimQuery: Map[String, Set[String]]) - (implicit ec: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = { - get(policy, items, timeRange).map { fetchedLs => - for { - FetchedCounts(exactKey, qualifierWithCountMap) <- fetchedLs - } yield { - val intervalWithCountMap = qualifierWithCountMap - .filter { case (eq, v) => eq.checkDimensionEquality(dimQuery) } - .groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) } - FetchedCountsGrouped(exactKey, intervalWithCountMap) - } - } - } - - override def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = { - // increment mutation to hbase - val increments = { - for { - (exactKey, values) <- counts - inc = new Increment(BytesUtilV1.toBytes(exactKey)) - } yield { - for { - (eq, value) <- values - } { - inc.addColumn(intervalsMap.apply(eq.tq.q).toString.getBytes, BytesUtilV1.toBytes(eq), value) - } - // add column by dimension - inc - } - } - - val results: Array[Object] = Array.fill(increments.size)(null) - - withHBase(getTableName(policy)) { table => - table.batch(increments, results) - } match { - case Failure(ex) => - log.error(s"${ex.getMessage}") - case _ => - } - - assert(counts.length == results.length) - - for { - ((exactKey, eqWithValue), result) <- counts.zip(results) - } yield { - val eqWithResult = result match { - case r: Result => - for { - (eq, value) <- eqWithValue - } yield { - val interval = eq.tq.q - val cf = intervalsMap(interval) - val result = Option(r.getColumnLatestCell(cf.toString.getBytes, BytesUtilV1.toBytes(eq))).map { cell => - Bytes.toLong(CellUtil.cloneValue(cell)) - }.getOrElse(-1l) - eq -> result - } - case ex: Throwable => - log.error(s"${ex.getMessage}: $exactKey") - Nil - case _ => - log.error(s"result is null: $exactKey") - Nil - } - (exactKey, eqWithResult.toMap) - } - }.toMap - - override def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = { - withHBase(getTableName(policy)) { table => - table.delete { - for { - key <- keys - } yield { - new Delete(BytesUtilV1.toBytes(key)) - } - } - } match { - case Failure(ex) => - log.error(ex.getMessage) - case _ => - } - } - - override def get(policy: Counter, - queries: Seq[(ExactKeyTrait, Seq[ExactQualifier])]) - (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] = { - lazy val messageForLog = s"${policy.service}.${policy.action} $queries" - - val gets = { - for { - (key, eqs) <- queries - } yield { - // println(s"$key $eqsGrouped") - val get = new Get(BytesUtilV1.toBytes(key)) - - for { - eq <- eqs - } { - val cf = intervalsMap(eq.tq.q) - get.addColumn(cf.toString.getBytes, BytesUtilV1.toBytes(eq)) - } - get - } - } - - Future { - withHBase(getTableName(policy)) { table => - for { - (rst, key) <- table.get(gets).zip(queries.map(_._1)) if !rst.isEmpty - } yield { - val qualifierWithCounts = { - for { - cell <- rst.listCells() - eq = BytesUtilV1.toExactQualifier(CellUtil.cloneQualifier(cell)) - } yield { - eq -> Bytes.toLong(CellUtil.cloneValue(cell)) - } - }.toMap - FetchedCounts(key, qualifierWithCounts) - } - } match { - case Success(rst) => rst.toSeq - case Failure(ex) => - log.error(s"$ex: $messageForLog") - Nil - } - } - } - - override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] = { - val results: Array[Object] = Array.fill(keys.size)(null) - - val puts = keys.map { key => - val put = new Put(BytesUtilV1.toBytes(key)) - put.addColumn(blobCF, blobColumn, key.itemId.getBytes) - } - - withHBase(getTableName(policy)) { table => - table.batch(puts, results) - } match { - case Failure(ex) => - log.error(s"${ex.getMessage}") - case _ => - } - - for { - (result, key) <- results.zip(keys) - } yield { - Option(result).map(_ => true).getOrElse { - log.error(s"fail to insert blob value: $key") - false - } - } - } - - override def getBlobValue(policy: Counter, blobId: String): Option[String] = { - lazy val messageForLog = s"${policy.service}.${policy.action}.$blobId" - - policy.itemType match { - case ItemType.BLOB => - withHBase(getTableName(policy)) { table => - val rowKey = BytesUtilV1.toBytes(ExactKey(policy.id, policy.version, policy.itemType, blobId)) - val get = new Get(rowKey) - get.addColumn(blobCF, blobColumn) - table.get(get) - } match { - case Success(result) => - Option(result).filter(!_.isEmpty).map { rst => - Bytes.toString(rst.getValue(blobCF, blobColumn)) - } - case Failure(ex) => - throw ex - } - case _ => - log.warn(s"is not blob type counter. $messageForLog") - throw new Exception(s"is not blob type counter. $messageForLog") - } - } - - override def prepare(policy: Counter): Unit = { - // create hbase table - policy.hbaseTable.foreach { table => - - if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)) { - hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table, - ColumnFamily.values.map(_.toString).toList, 1) - hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, ColumnFamily.SHORT.toString, policy.ttl) - policy.dailyTtl.foreach { i => - hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, ColumnFamily.LONG.toString, i * 24 * 60 * 60) - } - } - } - } - - override def destroy(policy: Counter): Unit = { - - } - - override def ready(policy: Counter): Boolean = { - policy.hbaseTable.map { table => - hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table) - }.getOrElse(true) - } -} - -object ExactStorageHBase { - import TimedQualifier.IntervalUnit._ - - object ColumnFamily extends Enumeration { - type ColumnFamily = Value - - val SHORT = Value("s") - val LONG = Value("l") - } - - val blobCF = ColumnFamily.LONG.toString.getBytes - val blobColumn = "b".getBytes - - val intervalsMap = Map( - MINUTELY -> ColumnFamily.SHORT, - HOURLY -> ColumnFamily.SHORT, - DAILY -> ColumnFamily.LONG, - MONTHLY -> ColumnFamily.LONG, - TOTAL -> ColumnFamily.LONG - ) -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/v1/RankingStorageRedis.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/v1/RankingStorageRedis.scala b/s2counter_core/src/main/scala/s2/counter/core/v1/RankingStorageRedis.scala deleted file mode 100644 index ea15a9c..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/v1/RankingStorageRedis.scala +++ /dev/null @@ -1,189 +0,0 @@ -package s2.counter.core.v1 - -import java.lang - -import com.typesafe.config.Config -import org.slf4j.LoggerFactory -import redis.clients.jedis.Pipeline -import s2.counter.core.RankingCounter.RankingValueMap -import s2.counter.core.TimedQualifier.IntervalUnit -import s2.counter.core.{RankingKey, RankingResult, RankingStorage} -import s2.helper.WithRedis -import s2.models.{Counter, CounterModel} - -import scala.collection.JavaConversions._ -import scala.util.{Failure, Success} - -/** - * Created by hsleep([email protected]) on 15. 6. 22.. - */ -class RankingStorageRedis(config: Config) extends RankingStorage { - private[counter] val log = LoggerFactory.getLogger(this.getClass) - private[counter] val withRedis = new WithRedis(config) - - val counterModel = new CounterModel(config) - - val TOTAL = "_total_" - - /** - * ex1) - * dimension = "age.32" - * ex2) - * dimension = "age.gender.32.m" - * - */ - private def makeBucket(rankingKey: RankingKey): String = { - val policyId = rankingKey.policyId - val q = rankingKey.eq.tq.q - val ts = rankingKey.eq.tq.ts - val dimension = rankingKey.eq.dimension - if (dimension.nonEmpty) { - s"$policyId.$q.$ts.$dimension" - } - else { - s"$policyId.$q.$ts" - } - } - - override def getTopK(rankingKey: RankingKey, k: Int): Option[RankingResult] = { - val bucket = makeBucket(rankingKey) - withRedis.doBlockWithKey(bucket) { jedis => - jedis.zrevrangeByScoreWithScores(bucket, "+inf", "-inf", 0, k + 1).toSeq.map(t => (t.getElement, t.getScore)) - } match { - case Success(values) => - if (values.nonEmpty) { -// println(values) - Some(RankingResult(values.find(_._1 == TOTAL).map(_._2).getOrElse(-1d), values.filter(_._1 != TOTAL).take(k))) - } - else { - None - } - case Failure(ex) => - log.error(s"fail to get top k($ex). $rankingKey") - None - } - } - - private def getTTL(policyId: Int, intervalUnit: IntervalUnit.IntervalUnit): Option[Int] = { - counterModel.findById(policyId).flatMap { policy => - intervalUnit match { - case IntervalUnit.MINUTELY => Some(policy.ttl) - case IntervalUnit.HOURLY => Some(policy.ttl) - // default daily ttl 31 day - case IntervalUnit.DAILY => Some(policy.dailyTtl.getOrElse(31) * 24 * 3600) - case IntervalUnit.MONTHLY => policy.dailyTtl - case IntervalUnit.TOTAL => policy.dailyTtl - } - } - } - - override def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = { - // update ranking by score - val bucket = makeBucket(key) - withRedis.doBlockWithKey(bucket) { jedis => - val pipeline = jedis.pipelined() - updateItem(pipeline, bucket, key, value, k) - pipeline.sync() - } match { - case Failure(ex) => - log.error(s"fail to update $key $value: $ex") - case _ => - } - } - - private def updateItem(pipeline: Pipeline, bucket: String, key: RankingKey, value: RankingValueMap, k: Int): Unit = { - val topSeq = value.map { case (item, rv) => - // jedis client accept only java's double - item -> rv.score.asInstanceOf[lang.Double] - }.toSeq.sortBy(_._2).takeRight(k) - pipeline.zadd(bucket, topSeq.toMap[String, lang.Double]) - pipeline.zincrby(bucket, value.mapValues(_.increment).values.sum, TOTAL) - pipeline.zremrangeByRank(bucket, 0, -(k + 1)) - // if ttl defined, set expire time to bucket - getTTL(key.policyId, key.eq.tq.q).foreach { ttl => - pipeline.expire(bucket, ttl) - } - } - - override def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit = { - values.map { case (key, value) => - (makeBucket(key), key, value) - }.groupBy { case (bucket, key, value) => - withRedis.getBucketIdx(bucket) - }.foreach { case (idx, seq) => - withRedis.doBlockWithIndex(idx) { jedis => - val pipeline = jedis.pipelined() - for { - (bucket, key, value) <- seq - } { - updateItem(pipeline, bucket, key, value, k) - } - pipeline.sync() - } match { - case Failure(ex) => - log.error(s"fail to update multi $idx: $ex") - case _ => - } - } - } - - override def delete(key: RankingKey): Unit = { - val bucket = makeBucket(key) - withRedis.doBlockWithKey(bucket) { jedis => - jedis.del(bucket) - } match { - case Success(deleted) => - log.info(s"success to delete $key") - case Failure(ex) => - log.error(s"fail to delete $key") - } - } - - override def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, RankingResult)] = { - keys.map { key => - (makeBucket(key), key) - }.groupBy { case (bucket, key) => - withRedis.getBucketIdx(bucket) - }.toSeq.par.flatMap { case (idx, seq) => - withRedis.doBlockWithIndex(idx) { jedis => - val pipeline = jedis.pipelined() - val keyWithRespLs = { - for { - (bucket, rankingKey) <- seq - } yield { - (rankingKey, pipeline.zrevrangeByScoreWithScores(bucket, "+inf", "-inf", 0, k + 1)) - } - } - pipeline.sync() - for { - (rankingKey, resp) <- keyWithRespLs - } yield { - (rankingKey, resp.get().toSeq.map { t => (t.getElement, t.getScore)}) - } - } match { - case Success(keyWithValues) => - for { - (rankingKey, values) <- keyWithValues - } yield { - val result = RankingResult(values.find(_._1 == TOTAL).map(_._2).getOrElse(-1d), values.filter(_._1 != TOTAL).take(k)) - (rankingKey, result) - } - case Failure(ex) => - Nil - } - } - }.seq - - override def prepare(policy: Counter): Unit = { - // do nothing - } - - override def destroy(policy: Counter): Unit = { - - } - - override def ready(policy: Counter): Boolean = { - // always return true - true - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/v2/BytesUtilV2.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/v2/BytesUtilV2.scala b/s2counter_core/src/main/scala/s2/counter/core/v2/BytesUtilV2.scala deleted file mode 100644 index f839221..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/v2/BytesUtilV2.scala +++ /dev/null @@ -1,91 +0,0 @@ -package s2.counter.core.v2 - -import org.apache.hadoop.hbase.util._ -import s2.counter.core.TimedQualifier.IntervalUnit -import s2.counter.core._ -import s2.models.Counter.ItemType -import s2.util.Hashes - -import scala.collection.mutable.ArrayBuffer - -/** - * Created by hsleep([email protected]) on 15. 6. 11.. - */ -object BytesUtilV2 extends BytesUtil { - // ExactKey: [hash(1b)][version(1b)][policy(4b)][item(variable)] - val BUCKET_BYTE_SIZE = Bytes.SIZEOF_BYTE - val VERSION_BYTE_SIZE = Bytes.SIZEOF_BYTE - val POLICY_ID_SIZE = Bytes.SIZEOF_INT - - val INTERVAL_SIZE = Bytes.SIZEOF_BYTE - val TIMESTAMP_SIZE = Bytes.SIZEOF_LONG - val TIMED_QUALIFIER_SIZE = INTERVAL_SIZE + TIMESTAMP_SIZE - - override def getRowKeyPrefix(id: Int): Array[Byte] = { - Array(s2.counter.VERSION_2) ++ Bytes.toBytes(id) - } - - override def toBytes(key: ExactKeyTrait): Array[Byte] = { - val buff = new ArrayBuffer[Byte] - // hash byte - buff ++= Bytes.toBytes(Hashes.murmur3(key.itemKey)).take(BUCKET_BYTE_SIZE) - - // row key prefix - // version + policy id - buff ++= getRowKeyPrefix(key.policyId) - - buff ++= { - key.itemType match { - case ItemType.INT => Bytes.toBytes(key.itemKey.toInt) - case ItemType.LONG => Bytes.toBytes(key.itemKey.toLong) - case ItemType.STRING | ItemType.BLOB => Bytes.toBytes(key.itemKey) - } - } - buff.toArray - } - - override def toBytes(eq: ExactQualifier): Array[Byte] = { - val len = eq.dimKeyValues.map { case (k, v) => k.length + 2 + v.length + 2 }.sum - val pbr = new SimplePositionedMutableByteRange(len) - for { - v <- ExactQualifier.makeSortedDimension(eq.dimKeyValues) - } { - OrderedBytes.encodeString(pbr, v, Order.ASCENDING) - } - toBytes(eq.tq) ++ pbr.getBytes - } - - override def toBytes(tq: TimedQualifier): Array[Byte] = { - val pbr = new SimplePositionedMutableByteRange(INTERVAL_SIZE + 2 + TIMESTAMP_SIZE + 1) - OrderedBytes.encodeString(pbr, tq.q.toString, Order.ASCENDING) - OrderedBytes.encodeInt64(pbr, tq.ts, Order.DESCENDING) - pbr.getBytes - } - - private def decodeString(pbr: PositionedByteRange): Stream[String] = { - if (pbr.getRemaining > 0) { - Stream.cons(OrderedBytes.decodeString(pbr), decodeString(pbr)) - } - else { - Stream.empty - } - } - - override def toExactQualifier(bytes: Array[Byte]): ExactQualifier = { - val pbr = new SimplePositionedByteRange(bytes) - ExactQualifier(toTimedQualifier(pbr), { - val seqStr = decodeString(pbr).toSeq - val (keys, values) = seqStr.splitAt(seqStr.length / 2) - keys.zip(values).toMap - }) - } - - override def toTimedQualifier(bytes: Array[Byte]): TimedQualifier = { - val pbr = new SimplePositionedByteRange(bytes) - toTimedQualifier(pbr) - } - - def toTimedQualifier(pbr: PositionedByteRange): TimedQualifier = { - TimedQualifier(IntervalUnit.withName(OrderedBytes.decodeString(pbr)), OrderedBytes.decodeInt64(pbr)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/v2/ExactStorageGraph.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/v2/ExactStorageGraph.scala b/s2counter_core/src/main/scala/s2/counter/core/v2/ExactStorageGraph.scala deleted file mode 100644 index 2b225c8..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/v2/ExactStorageGraph.scala +++ /dev/null @@ -1,346 +0,0 @@ -package s2.counter.core.v2 - -import com.kakao.s2graph.core.mysqls.Label -import com.typesafe.config.Config -import org.apache.http.HttpStatus -import org.slf4j.LoggerFactory -import play.api.libs.json._ -import s2.config.S2CounterConfig -import s2.counter.core.ExactCounter.ExactValueMap -import s2.counter.core._ -import s2.models.Counter -import s2.util.CartesianProduct - -import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} - -/** - * Created by hsleep([email protected]) on 15. 8. 19.. - */ -object ExactStorageGraph { - case class RespGraph(success: Boolean, result: Long) - implicit val respGraphFormat = Json.format[RespGraph] - - // using play-ws without play app - private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder() - private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build) -} - -case class ExactStorageGraph(config: Config) extends ExactStorage { - private val log = LoggerFactory.getLogger(this.getClass) - private val s2config = new S2CounterConfig(config) - - private val SERVICE_NAME = "s2counter" - private val COLUMN_NAME = "bucket" - private val labelPostfix = "_counts" - - val s2graphUrl = s2config.GRAPH_URL - val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL - val graphOp = new GraphOperation(config) - - import ExactStorageGraph._ - - override def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = { - import scala.concurrent.ExecutionContext.Implicits.global - - val (keyWithEq, reqJsLs) = toIncrementCountRequests(policy, counts).unzip(x => ((x._1, x._2), x._3)) - - val future = wsClient.url(s"$s2graphUrl/graphs/edges/incrementCount").post(Json.toJson(reqJsLs)).map { resp => - resp.status match { - case HttpStatus.SC_OK => - val respSeq = resp.json.as[Seq[RespGraph]] - - val keyWithEqResult = { - for { - ((key, eq), RespGraph(success, result)) <- keyWithEq.zip(respSeq) - } yield { - (key, (eq, result)) - } - }.groupBy(_._1).mapValues{ seq => seq.map(_._2).toMap } - keyWithEqResult - case _ => - throw new RuntimeException(s"update failed: $policy $counts") - } - } - Await.result(future, 10 second) - } - - def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = { - - } - - private def toIncrementCountRequests(policy: Counter, - counts: Seq[(ExactKeyTrait, ExactValueMap)]) - : Seq[(ExactKeyTrait, ExactQualifier, JsValue)] = { - val labelName = policy.action + labelPostfix - val timestamp = System.currentTimeMillis() - for { - (exactKey, values) <- counts - (eq, value) <- values - } yield { - val from = exactKey.itemKey - val to = eq.dimension - val json = Json.obj( - "timestamp" -> timestamp, - "operation" -> "incrementCount", - "from" -> from, - "to" -> to, - "label" -> labelName, - "props" -> Json.obj( - "_count" -> value, - "time_unit" -> eq.tq.q.toString, - "time_value" -> eq.tq.ts - ) - ) - (exactKey, eq, json) - } - } - - override def get(policy: Counter, - items: Seq[String], - timeRange: Seq[(TimedQualifier, TimedQualifier)], - dimQuery: Map[String, Set[String]]) - (implicit ex: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = { - val labelName = policy.action + labelPostfix - val label = Label.findByName(labelName).get -// val label = labelModel.findByName(labelName).get - - val ids = Json.toJson(items) - - val dimensions = { - for { - values <- CartesianProduct(dimQuery.values.map(ss => ss.toList).toList) - } yield { - dimQuery.keys.zip(values).toMap - } - } - - val stepJsLs = { - for { - (tqFrom, tqTo) <- timeRange - dimension <- dimensions - } yield { - val eqFrom = ExactQualifier(tqFrom, dimension) - val eqTo = ExactQualifier(tqTo, dimension) - val intervalJs = - s""" - |{ - | "from": { - | "_to": "${eqFrom.dimension}", - | "time_unit": "${eqFrom.tq.q}", - | "time_value": ${eqFrom.tq.ts} - | }, - | "to": { - | "_to": "${eqTo.dimension}", - | "time_unit": "${eqTo.tq.q}", - | "time_value": ${eqTo.tq.ts + 1} - | } - |} - """.stripMargin - val stepJs = - s""" - |{ - | "direction": "out", - | "limit": -1, - | "duplicate": "raw", - | "label": "$labelName", - | "interval": $intervalJs - |} - """.stripMargin - stepJs - } - } - - val reqJsStr = - s""" - |{ - | "srcVertices": [ - | {"serviceName": "${policy.service}", "columnName": "${label.srcColumnName}", "ids": $ids} - | ], - | "steps": [ - | { - | "step": [ - | ${stepJsLs.mkString(",")} - | ] - | } - | ] - |} - """.stripMargin - - val reqJs = Json.parse(reqJsStr) -// log.warn(s"query: ${reqJs.toString()}") - - wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(reqJs).map { resp => - resp.status match { - case HttpStatus.SC_OK => - val respJs = resp.json -// println(respJs) - val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { result => -// println(s"result: $result") - resultToExactKeyValues(policy, result) - }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap.groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) }) - for { - (k, v) <- keyWithValues.toSeq - } yield { - FetchedCountsGrouped(k, v) - } - case n: Int => - log.warn(s"getEdges status($n): $reqJsStr") -// println(s"getEdges status($n): $reqJsStr") - Nil - } - } - } - - private def resultToExactKeyValues(policy: Counter, result: JsValue): (ExactKeyTrait, (ExactQualifier, Long)) = { - val from = result \ "from" match { - case s: JsString => s.as[String] - case n: JsNumber => n.as[Long].toString - case x: JsValue => throw new RuntimeException(s"$x's type must be string or number") - } - val dimension = (result \ "to").as[String] - val props = result \ "props" - val count = (props \ "_count").as[Long] - val timeUnit = (props \ "time_unit").as[String] - val timeValue = (props \ "time_value").as[Long] - (ExactKey(policy, from, checkItemType = true), (ExactQualifier(TimedQualifier(timeUnit, timeValue), dimension), count)) - } - - private def getInner(policy: Counter, key: ExactKeyTrait, eqs: Seq[ExactQualifier]) - (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = { - val labelName = policy.action + labelPostfix - - Label.findByName(labelName) match { - case Some(label) => - - val src = Json.obj("serviceName" -> policy.service, "columnName" -> label.srcColumnName, "id" -> key.itemKey) - val step = { - val stepLs = { - for { - eq <- eqs - } yield { - val from = Json.obj("_to" -> eq.dimension, "time_unit" -> eq.tq.q.toString, "time_value" -> eq.tq.ts) - val to = Json.obj("_to" -> eq.dimension, "time_unit" -> eq.tq.q.toString, "time_value" -> eq.tq.ts) - val interval = Json.obj("from" -> from, "to" -> to) - Json.obj("limit" -> 1, "label" -> labelName, "interval" -> interval) - } - } - Json.obj("step" -> stepLs) - } - val query = Json.obj("srcVertices" -> Json.arr(src), "steps" -> Json.arr(step)) - // println(s"query: ${query.toString()}") - - wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(query).map { resp => - resp.status match { - case HttpStatus.SC_OK => - val respJs = resp.json - val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { result => - resultToExactKeyValues(policy, result) - }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap) - for { - (key, eqWithValues) <- keyWithValues.toSeq - } yield { - FetchedCounts(key, eqWithValues) - } - case _ => - Nil - } - } - case None => - Future.successful(Nil) - } - } - - // for query exact qualifier - override def get(policy: Counter, queries: Seq[(ExactKeyTrait, Seq[ExactQualifier])]) - (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = { - val futures = { - for { - (key, eqs) <- queries - } yield { -// println(s"$key $eqs") - getInner(policy, key, eqs) - } - } - Future.sequence(futures).map(_.flatten) - } - - override def getBlobValue(policy: Counter, blobId: String): Option[String] = { - throw new RuntimeException("unsupported getBlobValue operation") - } - - override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] = { - throw new RuntimeException("unsupported insertBlobValue operation") - } - - private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean = { - val action = policy.action - val counterLabelName = action + labelPostfix - - Label.findByName(counterLabelName, useCache).nonEmpty - } - - override def prepare(policy: Counter): Unit = { - val service = policy.service - val action = policy.action - - val graphLabel = Label.findByName(action) - if (graphLabel.isEmpty) { - throw new Exception(s"label not found. $service.$action") - } - - if (!existsLabel(policy, useCache = false)) { - val label = Label.findByName(action, useCache = false).get - - val counterLabelName = action + labelPostfix - val defaultJson = - s""" - |{ - | "label": "$counterLabelName", - | "srcServiceName": "$service", - | "srcColumnName": "${label.tgtColumnName}", - | "srcColumnType": "${label.tgtColumnType}", - | "tgtServiceName": "$SERVICE_NAME", - | "tgtColumnName": "$COLUMN_NAME", - | "tgtColumnType": "string", - | "indices": [ - | {"name": "time", "propNames": ["_to", "time_unit", "time_value"]} - | ], - | "props": [ - | {"name": "time_unit", "dataType": "string", "defaultValue": ""}, - | {"name": "time_value", "dataType": "long", "defaultValue": 0} - | ], - | "hTableName": "${policy.hbaseTable.get}" - |} - """.stripMargin - val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match { - case Some(ttl) => - Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> Json.toJson(ttl)) - case None => - Json.parse(defaultJson) - } - - graphOp.createLabel(json) - } - } - - override def destroy(policy: Counter): Unit = { - val action = policy.action - - if (existsLabel(policy, useCache = false)) { - val counterLabelName = action + labelPostfix - - graphOp.deleteLabel(counterLabelName) - } - } - - override def ready(policy: Counter): Boolean = { - existsLabel(policy) - } - - // for range query - override def get(policy: Counter, items: Seq[String], timeRange: Seq[(TimedQualifier, TimedQualifier)]) - (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] = { - throw new NotImplementedError("Not implemented") - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/v2/GraphOperation.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/v2/GraphOperation.scala b/s2counter_core/src/main/scala/s2/counter/core/v2/GraphOperation.scala deleted file mode 100644 index 0f43a52..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/v2/GraphOperation.scala +++ /dev/null @@ -1,52 +0,0 @@ -package s2.counter.core.v2 - -import com.typesafe.config.Config -import org.apache.http.HttpStatus -import org.slf4j.LoggerFactory -import play.api.libs.json.{JsObject, JsValue, Json} -import s2.config.S2CounterConfig - -import scala.concurrent.Await -import scala.concurrent.duration._ - -/** - * Created by hsleep([email protected]) on 2015. 11. 10.. - */ -class GraphOperation(config: Config) { - // using play-ws without play app - private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder() - private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build) - private val s2config = new S2CounterConfig(config) - val s2graphUrl = s2config.GRAPH_URL - private[counter] val log = LoggerFactory.getLogger(this.getClass) - - import scala.concurrent.ExecutionContext.Implicits.global - - def createLabel(json: JsValue): Boolean = { - // fix counter label's schemaVersion - val newJson = json.as[JsObject] ++ Json.obj("schemaVersion" -> "v2") - val future = wsClient.url(s"$s2graphUrl/graphs/createLabel").post(newJson).map { resp => - resp.status match { - case HttpStatus.SC_OK => - true - case _ => - throw new RuntimeException(s"failed createLabel. errCode: ${resp.status} body: ${resp.body} query: $json") - } - } - - Await.result(future, 10 second) - } - - def deleteLabel(label: String): Boolean = { - val future = wsClient.url(s"$s2graphUrl/graphs/deleteLabel/$label").put("").map { resp => - resp.status match { - case HttpStatus.SC_OK => - true - case _ => - throw new RuntimeException(s"failed deleteLabel. errCode: ${resp.status} body: ${resp.body}") - } - } - - Await.result(future, 10 second) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala deleted file mode 100644 index 18d7eda..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala +++ /dev/null @@ -1,399 +0,0 @@ -package s2.counter.core.v2 - -import com.kakao.s2graph.core.GraphUtil -import com.kakao.s2graph.core.mysqls.Label -import com.typesafe.config.Config -import org.apache.commons.httpclient.HttpStatus -import org.slf4j.LoggerFactory -import play.api.libs.json.{JsObject, JsString, JsValue, Json} -import s2.config.S2CounterConfig -import s2.counter.core.RankingCounter.RankingValueMap -import s2.counter.core.{RankingKey, RankingResult, RankingStorage} -import s2.models.{Counter, CounterModel} -import s2.util.{CollectionCache, CollectionCacheConfig} - -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} -import scala.util.hashing.MurmurHash3 - -/** - * Created by shon on 7/28/15. - */ -object RankingStorageGraph { - // using play-ws without play app - private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder() - private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build) -} - -class RankingStorageGraph(config: Config) extends RankingStorage { - import RankingStorageGraph._ - - private[counter] val log = LoggerFactory.getLogger(this.getClass) - private val s2config = new S2CounterConfig(config) - - private val BUCKET_SHARD_COUNT = 53 - private val SERVICE_NAME = "s2counter" - private val BUCKET_COLUMN_NAME = "bucket" - private val counterModel = new CounterModel(config) - private val labelPostfix = "_topK" - - val s2graphUrl = s2config.GRAPH_URL - val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL - - val prepareCache = new CollectionCache[Option[Boolean]](CollectionCacheConfig(10000, 600)) - val graphOp = new GraphOperation(config) - import scala.concurrent.ExecutionContext.Implicits.global - - private def makeBucketKey(rankingKey: RankingKey): String = { - val eq = rankingKey.eq - val tq = eq.tq - s"${tq.q}.${tq.ts}.${eq.dimension}" - } - - // "", "age.32", "age.gender.32.M" - private def makeBucketShardKey(shardIdx: Int, rankingKey: RankingKey): String = { - s"$shardIdx.${makeBucketKey(rankingKey)}" - } - - /** - * indexProps: ["time_unit", "time_value", "score"] - */ - override def getTopK(key: RankingKey, k: Int): Option[RankingResult] = { - getTopK(Seq(key), k).headOption.map(_._2) - } - - override def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, RankingResult)] = { - val futures = for { - key <- keys - } yield { - getEdges(key).map { edges => - key -> RankingResult(0d, toWithScoreLs(edges).take(k)) - } - } - - Await.result(Future.sequence(futures), 10 seconds) - } - - override def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = { - update(Seq((key, value)), k) - } - - override def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit = { - val futures = { - for { - (key, value) <- values - } yield { - // prepare dimension bucket edge - checkAndPrepareDimensionBucket(key) - - val future = getEdges(key, "raw").flatMap { edges => - val prevRankingSeq = toWithScoreLs(edges) - val prevRankingMap: Map[String, Double] = prevRankingSeq.groupBy(_._1).map(_._2.sortBy(-_._2).head) - val currentRankingMap: Map[String, Double] = value.mapValues(_.score) - val mergedRankingSeq = (prevRankingMap ++ currentRankingMap).toSeq.sortBy(-_._2).take(k) - val mergedRankingMap = mergedRankingSeq.toMap - - val bucketRankingSeq = mergedRankingSeq.groupBy { case (itemId, score) => - // 0-index - GraphUtil.transformHash(MurmurHash3.stringHash(itemId)) % BUCKET_SHARD_COUNT - }.map { case (shardIdx, groupedRanking) => - shardIdx -> groupedRanking.filter { case (itemId, _) => currentRankingMap.contains(itemId) } - }.toSeq - - insertBulk(key, bucketRankingSeq).flatMap { _ => - val duplicatedItems = prevRankingMap.filterKeys(s => currentRankingMap.contains(s)) - val cutoffItems = prevRankingMap.filterKeys(s => !mergedRankingMap.contains(s)) - val deleteItems = duplicatedItems ++ cutoffItems - - val keyWithEdgesLs = prevRankingSeq.map(_._1).zip(edges) - val deleteEdges = keyWithEdgesLs.filter{ case (s, _) => deleteItems.contains(s) }.map(_._2) - - deleteAll(deleteEdges) - } - } - - future - } - } - - Await.result(Future.sequence(futures), 10 seconds) - } - - private def toWithScoreLs(edges: List[JsValue]): List[(String, Double)] = { - for { - edgeJson <- edges - to = (edgeJson \ "to").as[JsValue] - score = (edgeJson \ "score").as[JsValue].toString().toDouble - } yield { - val toValue = to match { - case s: JsString => s.as[String] - case _ => to.toString() - } - toValue -> score - } - } - - private def insertBulk(key: RankingKey, newRankingSeq: Seq[(Int, Seq[(String, Double)])]): Future[Boolean] = { - val labelName = counterModel.findById(key.policyId).get.action + labelPostfix - val timestamp: Long = System.currentTimeMillis - val payload = Json.toJson { - for { - (shardIdx, rankingSeq) <- newRankingSeq - (itemId, score) <- rankingSeq - } yield { - val srcId = makeBucketShardKey(shardIdx, key) - Json.obj( - "timestamp" -> timestamp, - "from" -> srcId, - "to" -> itemId, - "label" -> labelName, - "props" -> Json.obj( - "time_unit" -> key.eq.tq.q.toString, - "time_value" -> key.eq.tq.ts, - "date_time" -> key.eq.tq.dateTime, - "score" -> score - ) - ) - } - } - - wsClient.url(s"$s2graphUrl/graphs/edges/insertBulk").post(payload).map { resp => - resp.status match { - case HttpStatus.SC_OK => - true - case _ => - throw new RuntimeException(s"failed insertBulk. errCode: ${resp.status}, body: ${resp.body}, query: $payload") - } - } - } - - private def deleteAll(edges: List[JsValue]): Future[Boolean] = { - // /graphs/edges/delete - val futures = { - for { - groupedEdges <- edges.grouped(50) - } yield { - val payload = Json.toJson(groupedEdges) - wsClient.url(s"$s2graphUrl/graphs/edges/delete").post(payload).map { resp => - resp.status match { - case HttpStatus.SC_OK => - true - case _ => - log.error(s"failed delete. errCode: ${resp.status}, body: ${resp.body}, query: $payload") - false - } - } - } - }.toSeq - - Future.sequence(futures).map { seq => - seq.forall(x => x) - } - } - - /** select and delete */ - override def delete(key: RankingKey): Unit = { - val future = getEdges(key).flatMap { edges => - deleteAll(edges) - } - Await.result(future, 10 second) - } - - private def getEdges(key: RankingKey, duplicate: String="first"): Future[List[JsValue]] = { - val labelName = counterModel.findById(key.policyId).get.action + labelPostfix - - val ids = { - (0 until BUCKET_SHARD_COUNT).map { shardIdx => - s""""${makeBucketShardKey(shardIdx, key)}"""" - } - }.mkString(",") - - val strJs = - s""" - |{ - | "srcVertices": [ - | { - | "serviceName": "$SERVICE_NAME", - | "columnName": "$BUCKET_COLUMN_NAME", - | "ids": [$ids] - | } - | ], - | "steps": [ - | { - | "step": [ - | { - | "label": "$labelName", - | "duplicate": "$duplicate", - | "direction": "out", - | "offset": 0, - | "limit": -1, - | "interval": { - | "from": {"time_unit": "${key.eq.tq.q.toString}", "time_value": ${key.eq.tq.ts}}, - | "to": {"time_unit": "${key.eq.tq.q.toString}", "time_value": ${key.eq.tq.ts}} - | }, - | "scoring": {"score": 1} - | } - | ] - | } - | ] - |} - """.stripMargin - log.debug(strJs) - - val payload = Json.parse(strJs) - wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(payload).map { resp => - resp.status match { - case HttpStatus.SC_OK => - (resp.json \ "results").asOpt[List[JsValue]].getOrElse(Nil) - case _ => - throw new RuntimeException(s"failed getEdges. errCode: ${resp.status}, body: ${resp.body}, query: $payload") - } - } - } - - private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean = { - val action = policy.action - val counterLabelName = action + labelPostfix - - Label.findByName(counterLabelName, useCache).nonEmpty - } - - private def checkAndPrepareDimensionBucket(rankingKey: RankingKey): Boolean = { - val dimension = rankingKey.eq.dimension - val bucketKey = makeBucketKey(rankingKey) - val labelName = "s2counter_topK_bucket" - - val prepared = prepareCache.withCache(s"$dimension:$bucketKey") { - val checkReqJs = Json.arr( - Json.obj( - "label" -> labelName, - "direction" -> "out", - "from" -> dimension, - "to" -> makeBucketShardKey(BUCKET_SHARD_COUNT - 1, rankingKey) - ) - ) - - val future = wsClient.url(s"$s2graphReadOnlyUrl/graphs/checkEdges").post(checkReqJs).map { resp => - resp.status match { - case HttpStatus.SC_OK => - val checkRespJs = resp.json - if (checkRespJs.as[Seq[JsValue]].nonEmpty) { - true - } else { - false - } - case _ => - // throw exception - throw new Exception(s"failed checkEdges. ${resp.body} ${resp.status}") - } - }.flatMap { - case true => Future.successful(Some(true)) - case false => - val insertReqJsLs = { - for { - i <- 0 until BUCKET_SHARD_COUNT - } yield { - Json.obj( - "timestamp" -> rankingKey.eq.tq.ts, - "from" -> dimension, - "to" -> makeBucketShardKey(i, rankingKey), - "label" -> labelName, - "props" -> Json.obj( - "time_unit" -> rankingKey.eq.tq.q.toString, - "date_time" -> rankingKey.eq.tq.dateTime - ) - ) - } - } - wsClient.url(s"$s2graphUrl/graphs/edges/insert").post(Json.toJson(insertReqJsLs)).map { resp => - resp.status match { - case HttpStatus.SC_OK => - Some(true) - case _ => - // throw exception - throw new Exception(s"failed insertEdges. ${resp.body} ${resp.status}") - } - } - }.recover { - case e: Exception => - None - } - - Await.result(future, 10 second) - } - prepared.getOrElse(false) - } - - override def prepare(policy: Counter): Unit = { - val service = policy.service - val action = policy.action - - val graphLabel = { - policy.rateActionId match { - case Some(rateId) => - counterModel.findById(rateId, useCache = false).flatMap { ratePolicy => - Label.findByName(ratePolicy.action) - } - case None => - Label.findByName(action) - } - } - if (graphLabel.isEmpty) { - throw new Exception(s"label not found. $service.$action") - } - - if (!existsLabel(policy, useCache = false)) { - // find input label to specify target column - val inputLabelName = policy.rateActionId.flatMap { id => - counterModel.findById(id, useCache = false).map(_.action) - }.getOrElse(action) - val label = graphLabel.get - - val counterLabelName = action + labelPostfix - val defaultJson = - s""" - |{ - | "label": "$counterLabelName", - | "srcServiceName": "$SERVICE_NAME", - | "srcColumnName": "$BUCKET_COLUMN_NAME", - | "srcColumnType": "string", - | "tgtServiceName": "$service", - | "tgtColumnName": "${label.tgtColumnName}", - | "tgtColumnType": "${label.tgtColumnType}", - | "indices": [ - | {"name": "time", "propNames": ["time_unit", "time_value", "score"]} - | ], - | "props": [ - | {"name": "time_unit", "dataType": "string", "defaultValue": ""}, - | {"name": "time_value", "dataType": "long", "defaultValue": 0}, - | {"name": "date_time", "dataType": "long", "defaultValue": 0}, - | {"name": "score", "dataType": "float", "defaultValue": 0.0} - | ], - | "hTableName": "${policy.hbaseTable.get}" - |} - """.stripMargin - val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match { - case Some(ttl) => - Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> Json.toJson(ttl)) - case None => - Json.parse(defaultJson) - } - - graphOp.createLabel(json) - } - } - - override def destroy(policy: Counter): Unit = { - val action = policy.action - - if (existsLabel(policy, useCache = false)) { - val counterLabelName = action + labelPostfix - - graphOp.deleteLabel(counterLabelName) - } - } - - override def ready(policy: Counter): Boolean = { - existsLabel(policy) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/decay/DecayFormula.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/decay/DecayFormula.scala b/s2counter_core/src/main/scala/s2/counter/decay/DecayFormula.scala deleted file mode 100644 index 66e1b93..0000000 --- a/s2counter_core/src/main/scala/s2/counter/decay/DecayFormula.scala +++ /dev/null @@ -1,9 +0,0 @@ -package s2.counter.decay - -/** - * Created by hsleep([email protected]) on 15. 6. 26.. - */ -trait DecayFormula { - def apply(value: Double, millis: Long): Double - def apply(value: Double, seconds: Int): Double -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/decay/ExpDecayFormula.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/decay/ExpDecayFormula.scala b/s2counter_core/src/main/scala/s2/counter/decay/ExpDecayFormula.scala deleted file mode 100644 index 6de9f69..0000000 --- a/s2counter_core/src/main/scala/s2/counter/decay/ExpDecayFormula.scala +++ /dev/null @@ -1,28 +0,0 @@ -package s2.counter.decay - -/** - * Created by hsleep([email protected]) on 15. 6. 26.. - */ -case class ExpDecayFormula(halfLifeInMillis: Double) extends DecayFormula { - val decayRate = - Math.log(2) / halfLifeInMillis - - override def apply(value: Double, millis: Long): Double = { - value * Math.pow(Math.E, decayRate * millis) - } - - override def apply(value: Double, seconds: Int): Double = { - apply(value, seconds * 1000L) - } -} - -object ExpDecayFormula { - @deprecated("do not use. just experimental", "0.14") - def byWindowTime(windowInMillis: Long, pct: Double): ExpDecayFormula = { - val halfLife = windowInMillis * Math.log(0.5) / Math.log(pct) - ExpDecayFormula(halfLife) - } - - def byMeanLifeTime(millis: Long): ExpDecayFormula = { - ExpDecayFormula(millis * Math.log(2)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/package.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/package.scala b/s2counter_core/src/main/scala/s2/counter/package.scala deleted file mode 100644 index 40a9e41..0000000 --- a/s2counter_core/src/main/scala/s2/counter/package.scala +++ /dev/null @@ -1,11 +0,0 @@ -package s2 - -/** - * Created by hsleep([email protected]) on 15. 5. 22.. - */ -package object counter { - val VERSION_1: Byte = 1 - val VERSION_2: Byte = 2 - - case class MethodNotSupportedException(message: String, cause: Throwable = null) extends Exception(message, cause) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala b/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala deleted file mode 100644 index 3cf9181..0000000 --- a/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala +++ /dev/null @@ -1,114 +0,0 @@ -package s2.helper - -import com.kakao.s2graph.core.Graph -import com.kakao.s2graph.core.mysqls.Label -import com.typesafe.config.Config -import play.api.libs.json.Json -import s2.config.S2CounterConfig -import s2.counter.core.v1.{ExactStorageAsyncHBase, RankingStorageRedis} -import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, RankingStorageGraph} -import s2.counter.core.{ExactCounter, RankingCounter} -import s2.models.{Counter, CounterModel} - -import scala.util.Try - -/** - * Created by hsleep([email protected]) on 2015. 11. 11.. - */ -class CounterAdmin(config: Config) { - val s2config = new S2CounterConfig(config) - val counterModel = new CounterModel(config) - val graphOp = new GraphOperation(config) - val s2graph = new Graph(config)(scala.concurrent.ExecutionContext.global) - val storageManagement = new com.kakao.s2graph.core.Management(s2graph) - - def setupCounterOnGraph(): Unit = { - // create s2counter service - val service = "s2counter" - storageManagement.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"$service-${config.getString("phase")}", 1, None, "gz") - // create bucket label - val label = "s2counter_topK_bucket" - if (Label.findByName(label, useCache = false).isEmpty) { - val strJs = - s""" - |{ - | "label": "$label", - | "srcServiceName": "s2counter", - | "srcColumnName": "dimension", - | "srcColumnType": "string", - | "tgtServiceName": "s2counter", - | "tgtColumnName": "bucket", - | "tgtColumnType": "string", - | "indices": [ - | {"name": "time", "propNames": ["time_unit", "date_time"]} - | ], - | "props": [ - | {"name": "time_unit", "dataType": "string", "defaultValue": ""}, - | {"name": "date_time", "dataType": "long", "defaultValue": 0} - | ], - | "hTableName": "s2counter_60", - | "hTableTTL": 5184000 - |} - """.stripMargin - graphOp.createLabel(Json.parse(strJs)) - } - } - - def createCounter(policy: Counter): Unit = { - val newPolicy = policy.copy(hbaseTable = Some(makeHTableName(policy))) - prepareStorage(newPolicy) - counterModel.createServiceAction(newPolicy) - } - - def deleteCounter(service: String, action: String): Option[Try[Unit]] = { - for { - policy <- counterModel.findByServiceAction(service, action, useCache = false) - } yield { - Try { - exactCounter(policy).destroy(policy) - if (policy.useRank) { - rankingCounter(policy).destroy(policy) - } - counterModel.deleteServiceAction(policy) - } - } - } - - def prepareStorage(policy: Counter): Unit = { - if (policy.rateActionId.isEmpty) { - // if defined rate action, do not use exact counter - exactCounter(policy).prepare(policy) - } - if (policy.useRank) { - rankingCounter(policy).prepare(policy) - } - } - - def prepareStorage(policy: Counter, version: Byte): Unit = { - // this function to prepare storage by version parameter instead of policy.version - prepareStorage(policy.copy(version = version)) - } - - private val exactCounterMap = Map( - s2.counter.VERSION_1 -> new ExactCounter(config, new ExactStorageAsyncHBase(config)), - s2.counter.VERSION_2 -> new ExactCounter(config, new ExactStorageGraph(config)) - ) - private val rankingCounterMap = Map( - s2.counter.VERSION_1 -> new RankingCounter(config, new RankingStorageRedis(config)), - s2.counter.VERSION_2 -> new RankingCounter(config, new RankingStorageGraph(config)) - ) - - private val tablePrefixMap = Map ( - s2.counter.VERSION_1 -> "s2counter", - s2.counter.VERSION_2 -> "s2counter_v2" - ) - - def exactCounter(version: Byte): ExactCounter = exactCounterMap(version) - def exactCounter(policy: Counter): ExactCounter = exactCounter(policy.version) - def rankingCounter(version: Byte): RankingCounter = rankingCounterMap(version) - def rankingCounter(policy: Counter): RankingCounter = rankingCounter(policy.version) - - def makeHTableName(policy: Counter): String = { - Seq(tablePrefixMap(policy.version), policy.service, policy.ttl) ++ policy.dailyTtl mkString "_" - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/helper/DistributedScanner.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/helper/DistributedScanner.scala b/s2counter_core/src/main/scala/s2/helper/DistributedScanner.scala deleted file mode 100644 index dcf4d03..0000000 --- a/s2counter_core/src/main/scala/s2/helper/DistributedScanner.scala +++ /dev/null @@ -1,74 +0,0 @@ -package s2.helper - -import java.util -import java.util.Comparator - -import com.google.common.primitives.SignedBytes -import org.apache.hadoop.hbase.client._ -import org.apache.hadoop.hbase.util.Bytes - -/** - * Created by hsleep([email protected]) on 15. 5. 21.. - */ - -object DistributedScanner { - val BUCKET_BYTE_SIZE = Bytes.SIZEOF_BYTE - - def getRealRowKey(result: Result): Array[Byte] = { - result.getRow.drop(BUCKET_BYTE_SIZE) - } -} - -class DistributedScanner(table: Table, scan: Scan) extends AbstractClientScanner { - import DistributedScanner._ - - private val BYTE_MAX = BigInt(256) - - private[helper] val scanners = { - for { - i <- 0 until BYTE_MAX.pow(BUCKET_BYTE_SIZE).toInt - } yield { - val bucketBytes: Array[Byte] = Bytes.toBytes(i).takeRight(BUCKET_BYTE_SIZE) - val newScan = new Scan(scan).setStartRow(bucketBytes ++ scan.getStartRow).setStopRow(bucketBytes ++ scan.getStopRow) - table.getScanner(newScan) - } - } - - val resultCache = new util.TreeMap[Result, java.util.Iterator[Result]](new Comparator[Result] { - val comparator = SignedBytes.lexicographicalComparator() - override def compare(o1: Result, o2: Result): Int = { - comparator.compare(getRealRowKey(o1), getRealRowKey(o2)) - } - }) - - lazy val initialized = { - val iterators = scanners.map(_.iterator()).filter(_.hasNext) - iterators.foreach { it => - resultCache.put(it.next(), it) - } - iterators.nonEmpty - } - - override def next(): Result = { - if (initialized) { - Option(resultCache.pollFirstEntry()).map { entry => - val it = entry.getValue - if (it.hasNext) { - // fill cache - resultCache.put(it.next(), it) - } - entry.getKey - }.orNull - } else { - null - } - } - - override def close(): Unit = { - for { - scanner <- scanners - } { - scanner.close() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/helper/HashShardingJedis.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/helper/HashShardingJedis.scala b/s2counter_core/src/main/scala/s2/helper/HashShardingJedis.scala deleted file mode 100644 index 395486d..0000000 --- a/s2counter_core/src/main/scala/s2/helper/HashShardingJedis.scala +++ /dev/null @@ -1,156 +0,0 @@ -package s2.helper - -import com.typesafe.config.Config -import org.slf4j.LoggerFactory -import redis.clients.jedis.exceptions.JedisException -import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} -import s2.config.S2CounterConfig -import s2.util.Hashes - -/** - * Created by jay on 14. 10. 31.. - */ -class HashShardingJedis(config: Config) { - lazy val s2config = new S2CounterConfig(config) - - private val log = LoggerFactory.getLogger(getClass) - - val poolConfig = new JedisPoolConfig() - poolConfig.setMaxTotal(150) - poolConfig.setMaxIdle(50) - poolConfig.setMaxWaitMillis(200) - - val jedisPools = s2config.REDIS_INSTANCES.map { case (host, port) => - new JedisPool(poolConfig, host, port) - } - val jedisPoolSize = jedisPools.size - - def getJedisPool(idx: Int): JedisPool = { - if(idx >= jedisPoolSize) - null - else - jedisPools(idx) - } - - def getJedisPoolWithBucketname2(bucket: String): JedisPool = { - val hashedValue = Hashes.murmur3(bucket) - val idx = hashedValue % jedisPoolSize - getJedisPool(idx) - } - - def getJedisPoolWithBucketname(bucket: String): (JedisPool, JedisPool) = { - val hashedValue = Hashes.murmur3(bucket) - val idx = hashedValue % jedisPoolSize - val secondaryIdx = if (jedisPoolSize <= 1) { - throw new Exception("too small sharding pool <= 1") - } else { - val newIdx = (hashedValue / jedisPoolSize) % (jedisPoolSize -1) - if(newIdx < idx) { - newIdx - } else { - newIdx +1 - } - } - (getJedisPool(idx), getJedisPool(secondaryIdx)) - } - - def doBlockWithJedisInstace(f : Jedis => Any, fallBack : => Any, jedis : Jedis) = { - try { - f(jedis) - } - catch { - case e:JedisException => { - fallBack - } - } - } - - def doBlockWithBucketName(f : Jedis => Any, fallBack : => Any, bucket : String) = { -// Logger.debug(s"start jedis do block") - //val (jedis_pool1, jedis_pool2) = getJedisPoolWithBucketname(bucket) - val jedis_pool1= getJedisPoolWithBucketname2(bucket) -// if(jedis_pool1 != null && jedis_pool2 != null) { - if(jedis_pool1 != null) { - var jedis1: Jedis = null -// var jedis2: Jedis = null - try { - jedis1 = jedis_pool1.getResource() -// jedis2 = jedis_pool2.getResource() - log.info(s">> Jedis Pool Active Num : ${jedis_pool1.getNumActive}") - - /* val f1 = Future(f(jedis1)) - val f2 = Future(f(jedis2)) - - val mixedFuture = Future.sequence(List(f1,f2)) */ - - val r1 = f(jedis1) - //val r2 = f(jedis2) - - r1 - } - catch { - case e:JedisException => { -// Logger.debug(s"following exception catched") -// Logger.debug(s"${e}") - jedis_pool1.returnBrokenResource(jedis1) -// jedis_pool2.returnBrokenResource(jedis2) - - jedis1 = null -// jedis2 = null - fallBack - } - } - finally { - if (jedis1 != null) jedis_pool1.returnResource(jedis1) -// if (jedis2 != null) jedis_pool2.returnResource(jedis2) - } - } - else{ -// Logger.debug(s"fallback executed") - fallBack - } - } - - def doBlockWithKey[T](key: String)(f: Jedis => T)(fallBack: => T) = { -// Logger.debug(s"start jedis do block") - val (jedis_pool1, jedis_pool2) = getJedisPoolWithBucketname(key) - if(jedis_pool1 != null && jedis_pool2 != null) { - var jedis1: Jedis = null - var jedis2: Jedis = null - try { - jedis1 = jedis_pool1.getResource() - jedis2 = jedis_pool2.getResource() - - /* val f1 = Future(f(jedis1)) - val f2 = Future(f(jedis2)) - - val mixedFuture = Future.sequence(List(f1,f2)) */ - - val r1 = f(jedis1) - //val r2 = f(jedis2) - - r1 - } - catch { - case e:JedisException => { -// Logger.debug(s"following exception catched") -// Logger.debug(s"${e}") - jedis_pool1.returnBrokenResource(jedis1) - jedis_pool2.returnBrokenResource(jedis2) - - jedis1 = null - jedis2 = null - fallBack - } - } - finally { - if (jedis1 != null) jedis_pool1.returnResource(jedis1) - if (jedis2 != null) jedis_pool2.returnResource(jedis2) - } - } - else{ -// Logger.debug(s"fallback executed") - fallBack - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/helper/Management.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/helper/Management.scala b/s2counter_core/src/main/scala/s2/helper/Management.scala deleted file mode 100644 index 266caba..0000000 --- a/s2counter_core/src/main/scala/s2/helper/Management.scala +++ /dev/null @@ -1,146 +0,0 @@ -package s2.helper - -import com.typesafe.config.Config -import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability} -import org.apache.hadoop.hbase.io.compress.Compression -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding -import org.apache.hadoop.hbase.regionserver.BloomType -import org.apache.hadoop.hbase.util.Bytes -import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} -import org.slf4j.LoggerFactory -import redis.clients.jedis.ScanParams - -import scala.collection.JavaConversions._ -import scala.util.Random - -/** -* Created by hsleep([email protected]) on 15. 3. 30.. -*/ -class Management(config: Config) { - val withRedis = new HashShardingJedis(config) - - val log = LoggerFactory.getLogger(this.getClass) - - def describe(zkAddr: String, tableName: String) = { - val admin = getAdmin(zkAddr) - val table = admin.getTableDescriptor(TableName.valueOf(tableName)) - - table.getColumnFamilies.foreach { cf => - println(s"columnFamily: ${cf.getNameAsString}") - cf.getValues.foreach { case (k, v) => - println(s"${Bytes.toString(k.get())} ${Bytes.toString(v.get())}") - } - } - } - - def setTTL(zkAddr: String, tableName: String, cfName: String, ttl: Int) = { - val admin = getAdmin(zkAddr) - val tableNameObj = TableName.valueOf(tableName) - val table = admin.getTableDescriptor(tableNameObj) - - val cf = table.getFamily(cfName.getBytes) - cf.setTimeToLive(ttl) - - admin.modifyColumn(tableNameObj, cf) - } - - def getAdmin(zkAddr: String): Admin = { - val conf = HBaseConfiguration.create() - conf.set("hbase.zookeeper.quorum", zkAddr) - val conn = ConnectionFactory.createConnection(conf) - conn.getAdmin - } - - def tableExists(zkAddr: String, tableName: String): Boolean = { - getAdmin(zkAddr).tableExists(TableName.valueOf(tableName)) - } - - def createTable(zkAddr: String, tableName: String, cfs: List[String], regionMultiplier: Int) = { - log.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier") - val admin = getAdmin(zkAddr) - val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier - try { - val desc = new HTableDescriptor(TableName.valueOf(tableName)) - desc.setDurability(Durability.ASYNC_WAL) - for (cf <- cfs) { - val columnDesc = new HColumnDescriptor(cf) - .setCompressionType(Compression.Algorithm.LZ4) - .setBloomFilterType(BloomType.ROW) - .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) - .setMaxVersions(1) - .setMinVersions(0) - .setBlocksize(32768) - .setBlockCacheEnabled(true) - desc.addFamily(columnDesc) - } - - if (regionCount <= 1) admin.createTable(desc) - else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) - } catch { - case e: Throwable => - log.error(s"$zkAddr, $tableName failed with $e", e) - throw e - } - } - - // we only use murmur hash to distribute row key. - private def getStartKey(regionCount: Int) = { - Bytes.toBytes(Int.MaxValue / regionCount) - } - - private def getEndKey(regionCount: Int) = { - Bytes.toBytes(Int.MaxValue / regionCount * (regionCount - 1)) - } - - case class RedisScanIterator(scanParams: ScanParams = new ScanParams().count(100)) extends Iterator[String] { - val nextCursorId: collection.mutable.Map[Int, String] = collection.mutable.Map.empty[Int, String] - var innerIterator: Iterator[String] = _ - - for { - i <- 0 until withRedis.jedisPoolSize - } { - nextCursorId.put(i, "0") - } - - def callScan(): Unit = { - if (nextCursorId.nonEmpty) { -// println(s"callScan: idx: $nextIdx, cursor: $nextCursorId") - val idx = Random.shuffle(nextCursorId.keys).head - val cursorId = nextCursorId(idx) - val pool = withRedis.getJedisPool(idx) - val conn = pool.getResource - try { - val result = conn.scan(cursorId, scanParams) - result.getStringCursor match { - case "0" => - log.debug(s"end scan: idx: $idx, cursor: $cursorId") - nextCursorId.remove(idx) - case x: String => - nextCursorId.put(idx, x) - } - innerIterator = result.getResult.toIterator - } finally { - pool.returnResource(conn) - } - } - else { - innerIterator = List.empty[String].toIterator - } - } - - // initialize - callScan() - - override def hasNext: Boolean = { - innerIterator.hasNext match { - case true => - true - case false => - callScan() - innerIterator.hasNext - } - } - - override def next(): String = innerIterator.next() - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/helper/WithHBase.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/helper/WithHBase.scala b/s2counter_core/src/main/scala/s2/helper/WithHBase.scala deleted file mode 100644 index ae3ff1e..0000000 --- a/s2counter_core/src/main/scala/s2/helper/WithHBase.scala +++ /dev/null @@ -1,101 +0,0 @@ -package s2.helper - -import com.stumbleupon.async.{Callback, Deferred} -import com.typesafe.config.Config -import org.apache.hadoop.hbase.client._ -import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} -import org.hbase.async.HBaseClient -import org.slf4j.LoggerFactory -import s2.config.S2CounterConfig - -import scala.concurrent.{Future, Promise} -import scala.util.Try - -/** - * Created by hsleep([email protected]) on 15. 6. 19.. - */ -class WithHBase(config: Config) { - lazy val logger = LoggerFactory.getLogger(this.getClass) - lazy val s2config = new S2CounterConfig(config) - - lazy val zkQuorum = s2config.HBASE_ZOOKEEPER_QUORUM - lazy val defaultTableName = s2config.HBASE_TABLE_NAME - - logger.info(s"$zkQuorum, $defaultTableName") - - val hbaseConfig = HBaseConfiguration.create() - s2config.getConfigMap("hbase").foreach { case (k, v) => - hbaseConfig.set(k, v) - } - -// lazy val conn: HConnection = HConnectionManager.createConnection(hbaseConfig) - lazy val conn: Connection = ConnectionFactory.createConnection(hbaseConfig) - - val writeBufferSize = 1024 * 1024 * 2 // 2MB - -// def apply[T](op: Table => T): Try[T] = { -// Try { -// val table = conn.getTable(TableName.valueOf(defaultTableName)) -// // do not keep failed operation in writer buffer -// table.setWriteBufferSize(writeBufferSize) -// try { -// op(table) -// } catch { -// case e: Throwable => -// logger.error(s"Operation to table($defaultTableName) is failed: ${e.getMessage}") -// throw e -// } finally { -// table.close() -// } -// } -// } - - def apply[T](tableName: String)(op: Table => T): Try[T] = { - Try { - val table = conn.getTable(TableName.valueOf(tableName)) - // do not keep failed operation in writer buffer - table.setWriteBufferSize(writeBufferSize) - try { - op(table) - } catch { - case ex: Exception => - logger.error(s"$ex: Operation to table($tableName) is failed") - throw ex - } finally { - table.close() - } - } - } -} - -case class WithAsyncHBase(config: Config) { - lazy val logger = LoggerFactory.getLogger(this.getClass) - lazy val s2config = new S2CounterConfig(config) - - lazy val zkQuorum = s2config.HBASE_ZOOKEEPER_QUORUM - - val hbaseConfig = HBaseConfiguration.create() - s2config.getConfigMap("hbase").foreach { case (k, v) => - hbaseConfig.set(k, v) - } - -// lazy val conn: HConnection = HConnectionManager.createConnection(hbaseConfig) - lazy val client: HBaseClient = new HBaseClient(zkQuorum) - - val writeBufferSize = 1024 * 1024 * 2 // 2MB - - def apply[T](op: HBaseClient => Deferred[T]): Future[T] = { - val promise = Promise[T]() - - op(client).addCallback(new Callback[Unit, T] { - def call(arg: T): Unit = { - promise.success(arg) - } - }).addErrback(new Callback[Unit, Exception] { - def call(ex: Exception): Unit = { - promise.failure(ex) - } - }) - promise.future - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/helper/WithRedis.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/helper/WithRedis.scala b/s2counter_core/src/main/scala/s2/helper/WithRedis.scala deleted file mode 100644 index 2046577..0000000 --- a/s2counter_core/src/main/scala/s2/helper/WithRedis.scala +++ /dev/null @@ -1,62 +0,0 @@ -package s2.helper - -import com.typesafe.config.Config -import org.slf4j.LoggerFactory -import redis.clients.jedis.exceptions.JedisException -import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} -import s2.config.S2CounterConfig -import s2.util.Hashes - -import scala.util.Try - -/** - * Created by hsleep([email protected]) on 15. 6. 19.. - */ -class WithRedis(config: Config) { - lazy val s2config = new S2CounterConfig(config) - - private val log = LoggerFactory.getLogger(getClass) - - val poolConfig = new JedisPoolConfig() - poolConfig.setMaxTotal(150) - poolConfig.setMaxIdle(50) - poolConfig.setMaxWaitMillis(200) - - val jedisPools = s2config.REDIS_INSTANCES.map { case (host, port) => - new JedisPool(poolConfig, host, port) - } - - def getBucketIdx(key: String): Int = { - Hashes.murmur3(key) % jedisPools.size - } - - def doBlockWithIndex[T](idx: Int)(f: Jedis => T): Try[T] = { - Try { - val pool = jedisPools(idx) - - var jedis: Jedis = null - - try { - jedis = pool.getResource - - f(jedis) - } - catch { - case e: JedisException => - pool.returnBrokenResource(jedis) - - jedis = null - throw e - } - finally { - if (jedis != null) { - pool.returnResource(jedis) - } - } - } - } - - def doBlockWithKey[T](key: String)(f: Jedis => T): Try[T] = { - doBlockWithIndex(getBucketIdx(key))(f) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/models/CachedDBModel.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/models/CachedDBModel.scala b/s2counter_core/src/main/scala/s2/models/CachedDBModel.scala deleted file mode 100644 index 5da0265..0000000 --- a/s2counter_core/src/main/scala/s2/models/CachedDBModel.scala +++ /dev/null @@ -1,14 +0,0 @@ -package s2.models - -import s2.util.{CollectionCache, CollectionCacheConfig} -import scalikejdbc.AutoSession - -/** - * Created by hsleep([email protected]) on 15. 5. 27.. - */ -trait CachedDBModel[T] { - implicit val s = AutoSession - - val cacheConfig: CollectionCacheConfig - lazy val cache = new CollectionCache[Option[T]](cacheConfig) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/models/Counter.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/models/Counter.scala b/s2counter_core/src/main/scala/s2/models/Counter.scala deleted file mode 100644 index e26e071..0000000 --- a/s2counter_core/src/main/scala/s2/models/Counter.scala +++ /dev/null @@ -1,213 +0,0 @@ -package s2.models - -import com.typesafe.config.Config -import s2.config.S2CounterConfig -import s2.util.{CollectionCache, CollectionCacheConfig} -import scalikejdbc._ - -/** -* Created by hsleep([email protected]) on 15. 1. 30.. -*/ -case class Counter(id: Int, useFlag: Boolean, version: Byte, service: String, action: String, - itemType: Counter.ItemType.ItemType, autoComb: Boolean, dimension: String, - useProfile: Boolean, bucketImpId: Option[String], - useRank: Boolean, - ttl: Int, dailyTtl: Option[Int], hbaseTable: Option[String], - intervalUnit: Option[String], - rateActionId: Option[Int], rateBaseId: Option[Int], rateThreshold: Option[Int]) { - val intervals: Array[String] = intervalUnit.map(s => s.split(',')).getOrElse(Array("t", "M", "d", "H")) - val dimensionSp = if (dimension.isEmpty) Array.empty[String] else dimension.split(',').sorted - - val dimensionList: List[Array[String]] = { - autoComb match { - case true => - for { - i <- (0 to math.min(4, dimensionSp.length)).toList - combines <- dimensionSp.combinations(i) - } yield { - combines - } - case false => - dimensionSp isEmpty match { - case true => List(Array()) - case false => dimensionSp.toList.map(sp => sp.split('.')) - } - } - } - - val dimensionSet: Set[Set[String]] = { - for { - arr <- dimensionList - } yield { - arr.toSet - } - }.toSet - - val isRateCounter: Boolean = rateActionId.isDefined && rateBaseId.isDefined && rateActionId != rateBaseId - val isTrendCounter: Boolean = rateActionId.isDefined && rateBaseId.isDefined && rateActionId == rateBaseId -} - -object Counter extends SQLSyntaxSupport[Counter] { - object ItemType extends Enumeration { - type ItemType = Value - val INT, LONG, STRING, BLOB = Value - } - - def apply(c: SyntaxProvider[Counter])(rs: WrappedResultSet): Counter = apply(c.resultName)(rs) - def apply(r: ResultName[Counter])(rs: WrappedResultSet): Counter = { - lazy val itemType = Counter.ItemType(rs.int(r.itemType)) - Counter(rs.int(r.id), rs.boolean(r.useFlag), rs.byte(r.version), rs.string(r.service), rs.string(r.action), - itemType, rs.boolean(r.autoComb), rs.string(r.dimension), - rs.boolean(r.useProfile), rs.stringOpt(r.bucketImpId), - rs.boolean(r.useRank), - rs.int(r.ttl), rs.intOpt(r.dailyTtl), rs.stringOpt(r.hbaseTable), rs.stringOpt(r.intervalUnit), - rs.intOpt(r.rateActionId), rs.intOpt(r.rateBaseId), rs.intOpt(r.rateThreshold)) - } - - def apply(useFlag: Boolean, version: Byte, service: String, action: String, itemType: Counter.ItemType.ItemType, - autoComb: Boolean, dimension: String, useProfile: Boolean = false, bucketImpId: Option[String] = None, - useRank: Boolean = false, ttl: Int = 259200, dailyTtl: Option[Int] = None, - hbaseTable: Option[String] = None, intervalUnit: Option[String] = None, - rateActionId: Option[Int] = None, rateBaseId: Option[Int] = None, rateThreshold: Option[Int] = None): Counter = { - Counter(-1, useFlag, version, service, action, itemType, autoComb, dimension, - useProfile, bucketImpId, - useRank, ttl, dailyTtl, hbaseTable, - intervalUnit, rateActionId, rateBaseId, rateThreshold) - } -} - -class CounterModel(config: Config) extends CachedDBModel[Counter] { - private lazy val s2Config = new S2CounterConfig(config) - // enable negative cache - override val cacheConfig: CollectionCacheConfig = - new CollectionCacheConfig(s2Config.CACHE_MAX_SIZE, s2Config.CACHE_TTL_SECONDS, - negativeCache = true, s2Config.CACHE_NEGATIVE_TTL_SECONDS) - - val c = Counter.syntax("c") - val r = c.result - - val multiCache = new CollectionCache[Seq[Counter]](cacheConfig) - - def findById(id: Int, useCache: Boolean = true): Option[Counter] = { - lazy val sql = withSQL { - selectFrom(Counter as c).where.eq(c.id, id).and.eq(c.useFlag, 1) - }.map(Counter(c)) - - if (useCache) { - cache.withCache(s"_id:$id") { - sql.single().apply() - } - } else { - sql.single().apply() - } - } - - def findByServiceAction(service: String, action: String, useCache: Boolean = true): Option[Counter] = { - lazy val sql = withSQL { - selectFrom(Counter as c).where.eq(c.service, service).and.eq(c.action, action).and.eq(c.useFlag, 1) - }.map(Counter(c)) - - if (useCache) { - cache.withCache(s"$service.$action") { - sql.single().apply() - } - } - else { - sql.single().apply() - } - } - - def findByRateActionId(rateActionId: Int, useCache: Boolean = true): Seq[Counter] = { - lazy val sql = withSQL { - selectFrom(Counter as c).where.eq(c.rateActionId, rateActionId).and.ne(c.rateBaseId, rateActionId).and.eq(c.useFlag, 1) - }.map(Counter(c)) - - if (useCache) { - multiCache.withCache(s"_rate_action_id.$rateActionId") { - sql.list().apply() - } - } else { - sql.list().apply() - } - } - - def findByRateBaseId(rateBaseId: Int, useCache: Boolean = true): Seq[Counter] = { - lazy val sql = withSQL { - selectFrom(Counter as c).where.eq(c.rateBaseId, rateBaseId).and.ne(c.rateActionId, rateBaseId).and.eq(c.useFlag, 1) - }.map(Counter(c)) - - if (useCache) { - multiCache.withCache(s"_rate_base_id.$rateBaseId") { - sql.list().apply() - } - } else { - sql.list().apply() - } - } - - def findByTrendActionId(trendActionId: Int, useCache: Boolean = true): Seq[Counter] = { - lazy val sql = withSQL { - selectFrom(Counter as c).where.eq(c.rateActionId, trendActionId).and.eq(c.rateBaseId, trendActionId).and.eq(c.useFlag, 1) - }.map(Counter(c)) - - if (useCache) { - multiCache.withCache(s"_trend_action_id.$trendActionId") { - sql.list().apply() - } - } else { - sql.list().apply() - } - } - - def createServiceAction(policy: Counter): Unit = { - withSQL { - val c = Counter.column - insert.into(Counter).namedValues( - c.useFlag -> policy.useFlag, - c.version -> policy.version, - c.service -> policy.service, - c.action -> policy.action, - c.itemType -> policy.itemType.id, - c.autoComb -> policy.autoComb, - c.dimension -> policy.dimension, - c.useProfile -> policy.useProfile, - c.bucketImpId -> policy.bucketImpId, - c.useRank -> policy.useRank, - c.ttl -> policy.ttl, - c.dailyTtl -> policy.dailyTtl, - c.hbaseTable -> policy.hbaseTable, - c.intervalUnit -> policy.intervalUnit, - c.rateActionId -> policy.rateActionId, - c.rateBaseId -> policy.rateBaseId, - c.rateThreshold -> policy.rateThreshold - ) - }.update().apply() - } - - def updateServiceAction(policy: Counter): Unit = { - withSQL { - val c = Counter.column - update(Counter).set( - c.autoComb -> policy.autoComb, - c.dimension -> policy.dimension, - c.useProfile -> policy.useProfile, - c.bucketImpId -> policy.bucketImpId, - c.useRank -> policy.useRank, - c.intervalUnit -> policy.intervalUnit, - c.rateActionId -> policy.rateActionId, - c.rateBaseId -> policy.rateBaseId, - c.rateThreshold -> policy.rateThreshold - ).where.eq(c.id, policy.id) - }.update().apply() - } - - def deleteServiceAction(policy: Counter): Unit = { - withSQL { - val c = Counter.column - update(Counter).set( - c.action -> s"deleted_${System.currentTimeMillis()}_${policy.action}", - c.useFlag -> false - ).where.eq(c.id, policy.id) - }.update().apply() - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/models/DBModel.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/models/DBModel.scala b/s2counter_core/src/main/scala/s2/models/DBModel.scala deleted file mode 100644 index 6cb34b9..0000000 --- a/s2counter_core/src/main/scala/s2/models/DBModel.scala +++ /dev/null @@ -1,27 +0,0 @@ -package s2.models - -import com.typesafe.config.Config -import s2.config.S2CounterConfig -import scalikejdbc._ - -/** - * Created by alec on 15. 3. 31.. - */ -object DBModel { - private var initialized = false - - def initialize(config: Config): Unit = { - if (!initialized) { - this synchronized { - if (!initialized) { - val s2Config = new S2CounterConfig(config) - Class.forName(s2Config.DB_DEFAULT_DRIVER) - val settings = ConnectionPoolSettings(initialSize = 0, maxSize = 10, connectionTimeoutMillis = 5000L, validationQuery = "select 1;") - - ConnectionPool.singleton(s2Config.DB_DEFAULT_URL, s2Config.DB_DEFAULT_USER, s2Config.DB_DEFAULT_PASSWORD, settings) - initialized = true - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/util/CartesianProduct.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/util/CartesianProduct.scala b/s2counter_core/src/main/scala/s2/util/CartesianProduct.scala deleted file mode 100644 index 2077e3f..0000000 --- a/s2counter_core/src/main/scala/s2/util/CartesianProduct.scala +++ /dev/null @@ -1,11 +0,0 @@ -package s2.util - -/** - * Created by hsleep([email protected]) on 15. 6. 19.. - */ -object CartesianProduct { - def apply[T](xss: List[List[T]]): List[List[T]] = xss match { - case Nil => List(Nil) - case h :: t => for(xh <- h; xt <- apply(t)) yield xh :: xt - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/util/CollectionCache.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/util/CollectionCache.scala b/s2counter_core/src/main/scala/s2/util/CollectionCache.scala deleted file mode 100644 index 122f87a..0000000 --- a/s2counter_core/src/main/scala/s2/util/CollectionCache.scala +++ /dev/null @@ -1,69 +0,0 @@ -package s2.util - -import java.net.InetAddress -import java.util.concurrent.TimeUnit - -import com.google.common.cache.{Cache, CacheBuilder} -import org.slf4j.LoggerFactory - -import scala.concurrent.{ExecutionContext, Future} -import scala.language.{postfixOps, reflectiveCalls} - -/** - * Created by hsleep([email protected]) on 15. 7. 1.. - */ -case class CollectionCacheConfig(maxSize: Int, ttl: Int, negativeCache: Boolean = false, negativeTTL: Int = 600) - -class CollectionCache[C <: { def nonEmpty: Boolean; def isEmpty: Boolean } ](config: CollectionCacheConfig) { - private val cache: Cache[String, C] = CacheBuilder.newBuilder() - .expireAfterWrite(config.ttl, TimeUnit.SECONDS) - .maximumSize(config.maxSize) - .build[String, C]() - -// private lazy val cache = new SynchronizedLruMap[String, (C, Int)](config.maxSize) - private lazy val className = this.getClass.getSimpleName - - private lazy val log = LoggerFactory.getLogger(this.getClass) - val localHostname = InetAddress.getLocalHost.getHostName - - def size = cache.size - val maxSize = config.maxSize - - // cache statistics - def getStatsString: String = { - s"$localHostname ${cache.stats().toString}" - } - - def withCache(key: String)(op: => C): C = { - Option(cache.getIfPresent(key)) match { - case Some(r) => r - case None => - val r = op - if (r.nonEmpty || config.negativeCache) { - cache.put(key, r) - } - r - } - } - - def withCacheAsync(key: String)(op: => Future[C])(implicit ec: ExecutionContext): Future[C] = { - Option(cache.getIfPresent(key)) match { - case Some(r) => Future.successful(r) - case None => - op.map { r => - if (r.nonEmpty || config.negativeCache) { - cache.put(key, r) - } - r - } - } - } - - def purgeKey(key: String) = { - cache.invalidate(key) - } - - def contains(key: String): Boolean = { - Option(cache.getIfPresent(key)).nonEmpty - } -}
