http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/TimedQualifier.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/TimedQualifier.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/TimedQualifier.scala new file mode 100644 index 0000000..ad2bea2 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/TimedQualifier.scala @@ -0,0 +1,215 @@ +package org.apache.s2graph.counter.core + +import java.text.SimpleDateFormat +import java.util.Calendar + +case class TimedQualifier(q: TimedQualifier.IntervalUnit.Value, ts: Long) { + import TimedQualifier.IntervalUnit._ + + def dateTime: Long = { + val dateFormat = new SimpleDateFormat("yyyyMMddHHmm") + dateFormat.format(ts).toLong + } + + def add(amount: Int): TimedQualifier = { + val cal = Calendar.getInstance() + cal.setTimeInMillis(ts) + q match { + case MINUTELY => + cal.add(Calendar.MINUTE, amount) + case HOURLY => + cal.add(Calendar.HOUR, amount) + case DAILY => + cal.add(Calendar.DAY_OF_MONTH, amount) + case MONTHLY => + cal.add(Calendar.MONTH, amount) + case TOTAL => + } + copy(ts = cal.getTimeInMillis) + } +} + +object TimedQualifier { + object IntervalUnit extends Enumeration { + type IntervalUnit = Value + val TOTAL = Value("t") + val MONTHLY = Value("M") + val DAILY = Value("d") + val HOURLY = Value("H") + val MINUTELY = Value("m") + } + + def apply(q: String, ts: Long): TimedQualifier = TimedQualifier(IntervalUnit.withName(q), ts) + + import IntervalUnit._ + + def getTsUnit(intervalUnit: IntervalUnit.IntervalUnit): Long = { + intervalUnit match { + case MINUTELY => 1 * 60 * 1000l + case HOURLY => 60 * 60 * 1000l + case DAILY => 24 * 60 * 60 * 1000l + case MONTHLY => 31 * 24 * 60 * 60 * 1000l + case v: IntervalUnit.IntervalUnit => + throw new RuntimeException(s"unsupported operation for ${v.toString}") + } + } + + def getQualifiers(intervals: Seq[IntervalUnit], millis: Long): Seq[TimedQualifier] = { + val cal = Calendar.getInstance() + cal.setTimeInMillis(millis) + + val newCal = Calendar.getInstance() + newCal.set(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH), 1, 0, 0, 0) + newCal.set(Calendar.MILLISECOND, 0) + val month = newCal.getTimeInMillis + val Seq(day, hour, minute) = { + for { + field <- Seq(Calendar.DATE, Calendar.HOUR_OF_DAY, Calendar.MINUTE) + } yield { + newCal.set(field, cal.get(field)) + newCal.getTimeInMillis + } + } + + for { + interval <- intervals + } yield { + val ts = interval match { + case MINUTELY => minute + case HOURLY => hour + case DAILY => day + case MONTHLY => month + case TOTAL => 0L + } + TimedQualifier(interval, ts) + } + } + + // descending order + def getQualifiersToLimit(intervals: Seq[IntervalUnit], limit: Int, tsOpt: Option[Long] = None): Seq[TimedQualifier] = { + val ts = tsOpt.getOrElse(System.currentTimeMillis()) + for { + interval <- intervals + newLimit = if (interval == TOTAL) 1 else limit + i <- 0 until (-newLimit, -1) + } yield { + val newMillis = nextTime(interval, ts, i) + TimedQualifier(interval, newMillis) + } + } + + private def nextTime(interval: IntervalUnit, ts: Long, i: Int): Long = { + val newCal = Calendar.getInstance() + newCal.setTimeInMillis(ts) + newCal.set(Calendar.MILLISECOND, 0) + interval match { + case MINUTELY => + newCal.set(Calendar.SECOND, 0) + newCal.add(Calendar.MINUTE, i) + newCal.getTimeInMillis + case HOURLY => + newCal.set(Calendar.SECOND, 0) + newCal.set(Calendar.MINUTE, 0) + newCal.add(Calendar.HOUR_OF_DAY, i) + newCal.getTimeInMillis + case DAILY => + newCal.set(Calendar.SECOND, 0) + newCal.set(Calendar.MINUTE, 0) + newCal.set(Calendar.HOUR_OF_DAY, 0) + newCal.add(Calendar.DAY_OF_MONTH, i) + newCal.getTimeInMillis + case MONTHLY => + newCal.set(Calendar.SECOND, 0) + newCal.set(Calendar.MINUTE, 0) + newCal.set(Calendar.HOUR_OF_DAY, 0) + newCal.set(Calendar.DAY_OF_MONTH, 1) + newCal.add(Calendar.MONTH, i) + newCal.getTimeInMillis + case TOTAL => + 0L + } + } + + def getTimeList(interval: IntervalUnit, from: Long, to: Long, rst: List[Long] = Nil): List[Long] = { + interval match { + case TOTAL => List(0) + case _ => + val next = nextTime(interval, from, 1) + if (next < from) { + // ignore + getTimeList(interval, next, to, rst) + } + else if (next < to) { + // recall + getTimeList(interval, next, to, rst :+ next) + } else { + // end condition + rst :+ next + } + } + } + + // for reader + def getQualifiersToLimit(intervals: Seq[IntervalUnit], + limit: Int, + optFrom: Option[Long], + optTo: Option[Long]): Seq[List[TimedQualifier]] = { + val newLimit = limit - 1 + for { + interval <- intervals + } yield { + { + (optFrom, optTo) match { + case (Some(from), Some(to)) => + getTimeList(interval, from, to) + case (Some(from), None) => + getTimeList(interval, from, nextTime(interval, from, newLimit)) + case (None, Some(to)) => + getTimeList(interval, nextTime(interval, to, -newLimit), to) + case (None, None) => + val current = System.currentTimeMillis() + getTimeList(interval, nextTime(interval, current, -newLimit), current) + } + }.map { ts => + TimedQualifier(interval, ts) + } + } + } + + def getTimeRange(intervals: Seq[IntervalUnit], + limit: Int, + optFrom: Option[Long], + optTo: Option[Long]): Seq[(TimedQualifier, TimedQualifier)] = { + val newLimit = limit - 1 + val maxInterval = intervals.maxBy { + case MINUTELY => 0 + case HOURLY => 1 + case DAILY => 2 + case MONTHLY => 3 + case TOTAL => 4 + } + val minInterval = intervals.minBy { + case MINUTELY => 0 + case HOURLY => 1 + case DAILY => 2 + case MONTHLY => 3 + case TOTAL => 4 + } + val (from, to) = (optFrom, optTo) match { + case (Some(f), Some(t)) => + (f, t) + case (Some(f), None) => + (f, nextTime(minInterval, f, newLimit)) + case (None, Some(t)) => + (nextTime(maxInterval, t, -newLimit), t) + case (None, None) => + val current = System.currentTimeMillis() + (nextTime(maxInterval, current, -newLimit), nextTime(minInterval, current, 0)) + } + for { + interval <- intervals + } yield { + (TimedQualifier(interval, from), TimedQualifier(interval, to)) + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/BytesUtilV1.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/BytesUtilV1.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/BytesUtilV1.scala new file mode 100644 index 0000000..eaef60d --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/BytesUtilV1.scala @@ -0,0 +1,60 @@ +package org.apache.s2graph.counter.core.v1 + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit +import org.apache.s2graph.counter.core.{TimedQualifier, ExactQualifier, ExactKeyTrait, BytesUtil} +import org.apache.s2graph.counter.models.Counter.ItemType +import org.apache.s2graph.counter.util.Hashes +import scala.collection.mutable.ArrayBuffer + +object BytesUtilV1 extends BytesUtil { + // ExactKey: [hash(2b)][policy(4b)][item(variable)] + val BUCKET_BYTE_SIZE = Bytes.SIZEOF_SHORT + val POLICY_ID_SIZE = Bytes.SIZEOF_INT + val INTERVAL_SIZE = Bytes.SIZEOF_BYTE + val TIMESTAMP_SIZE = Bytes.SIZEOF_LONG + val TIMED_QUALIFIER_SIZE = INTERVAL_SIZE + TIMESTAMP_SIZE + + override def getRowKeyPrefix(id: Int): Array[Byte] = { + Bytes.toBytes(id) + } + + override def toBytes(key: ExactKeyTrait): Array[Byte] = { + val buff = new ArrayBuffer[Byte] + // hash key (2 byte) + buff ++= Bytes.toBytes(Hashes.murmur3(key.itemKey)).take(BUCKET_BYTE_SIZE) + + buff ++= getRowKeyPrefix(key.policyId) + buff ++= { + key.itemType match { + case ItemType.INT => Bytes.toBytes(key.itemKey.toInt) + case ItemType.LONG => Bytes.toBytes(key.itemKey.toLong) + case ItemType.STRING | ItemType.BLOB => Bytes.toBytes(key.itemKey) + } + } + buff.toArray + } + + override def toBytes(eq: ExactQualifier): Array[Byte] = { + toBytes(eq.tq) ++ eq.dimension.getBytes + } + + override def toBytes(tq: TimedQualifier): Array[Byte] = { + Bytes.toBytes(tq.q.toString) ++ Bytes.toBytes(tq.ts) + } + + override def toExactQualifier(bytes: Array[Byte]): ExactQualifier = { + // qualifier: interval, ts, dimension ìì + val tq = toTimedQualifier(bytes) + + val dimension = Bytes.toString(bytes, TIMED_QUALIFIER_SIZE, bytes.length - TIMED_QUALIFIER_SIZE) + ExactQualifier(tq, dimension) + } + + override def toTimedQualifier(bytes: Array[Byte]): TimedQualifier = { + val interval = Bytes.toString(bytes, 0, INTERVAL_SIZE) + val ts = Bytes.toLong(bytes, INTERVAL_SIZE) + + TimedQualifier(IntervalUnit.withName(interval), ts) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala new file mode 100644 index 0000000..340986f --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala @@ -0,0 +1,317 @@ +package org.apache.s2graph.counter.core.v1 + +import java.util +import com.stumbleupon.async.{Callback, Deferred} +import com.typesafe.config.Config +import org.apache.hadoop.hbase.CellUtil +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.core +import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap +import org.apache.s2graph.counter.core._ +import org.apache.s2graph.counter.helper.{Management, WithAsyncHBase, WithHBase} +import org.apache.s2graph.counter.models.Counter +import org.apache.s2graph.counter.models.Counter.ItemType +import org.hbase.async.{KeyValue, ColumnRangeFilter, FilterList, GetRequest} +import org.slf4j.LoggerFactory +import scala.collection.JavaConversions._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +class ExactStorageAsyncHBase(config: Config) extends ExactStorage { + import ExactStorageHBase._ + + private val log = LoggerFactory.getLogger(getClass) + + lazy val s2config = new S2CounterConfig(config) + + private[counter] val withHBase = new WithHBase(config) + private[counter] val withAsyncHBase = new WithAsyncHBase(config) + private[counter] val hbaseManagement = new Management(config) + + private def getTableName(policy: Counter): String = { + policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME) + } + + override def get(policy: Counter, + items: Seq[String], + timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)]) + (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = { + + val tableName = getTableName(policy) + + lazy val messageForLog = s"${policy.service}.${policy.action} $items $timeRange" + + val keys = { + for { + item <- items + } yield { + ExactKey(policy, item, checkItemType = true) + } + } + + val gets = { + for { + cf <- timeRange.map(t => intervalsMap(t._1.q)).distinct + key <- keys + } yield { + val get = new GetRequest(tableName, BytesUtilV1.toBytes(key)) + get.family(cf.toString) + get.setFilter(new FilterList({ + for { + (from, to) <- timeRange + } yield { + new ColumnRangeFilter( + BytesUtilV1.toBytes(from), true, + BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false) + } + }, FilterList.Operator.MUST_PASS_ONE)) + (key, cf, get) + } + } + +// println(s"$messageForLog $gets") + + withAsyncHBase[Seq[FetchedCounts]] { client => + val deferreds: Seq[Deferred[FetchedCounts]] = { + for { + (key, cf, get) <- gets + } yield { + client.get(get).addCallback { new Callback[FetchedCounts, util.ArrayList[KeyValue]] { + override def call(kvs: util.ArrayList[KeyValue]): FetchedCounts = { + val qualifierWithCounts = { + for { + kv <- kvs + eq = BytesUtilV1.toExactQualifier(kv.qualifier()) + } yield { + eq -> Bytes.toLong(kv.value()) + } + }.toMap +// println(s"$key $qualifierWithCounts") + FetchedCounts(key, qualifierWithCounts) + } + }} + } + } + Deferred.group(deferreds).addCallback { new Callback[Seq[FetchedCounts], util.ArrayList[FetchedCounts]] { + override def call(arg: util.ArrayList[FetchedCounts]): Seq[FetchedCounts] = { + for { + (key, fetchedGroup) <- Seq(arg: _*).groupBy(_.exactKey) + } yield { + fetchedGroup.reduce[FetchedCounts] { case (f1, f2) => + FetchedCounts(key, f1.qualifierWithCountMap ++ f2.qualifierWithCountMap) + } + } + }.toSeq + }} + } + } + + override def get(policy: Counter, + items: Seq[String], + timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)], + dimQuery: Map[String, Set[String]]) + (implicit ec: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = { + get(policy, items, timeRange).map { fetchedLs => + for { + FetchedCounts(exactKey, qualifierWithCountMap) <- fetchedLs + } yield { + val intervalWithCountMap = qualifierWithCountMap + .filter { case (eq, v) => eq.checkDimensionEquality(dimQuery) } + .groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) } + FetchedCountsGrouped(exactKey, intervalWithCountMap) + } + } + } + + override def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = { + // increment mutation to hbase + val increments = { + for { + (exactKey, values) <- counts + inc = new Increment(BytesUtilV1.toBytes(exactKey)) + } yield { + for { + (eq, value) <- values + } { + inc.addColumn(intervalsMap.apply(eq.tq.q).toString.getBytes, BytesUtilV1.toBytes(eq), value) + } + // add column by dimension + inc + } + } + + val results: Array[Object] = Array.fill(increments.size)(null) + + withHBase(getTableName(policy)) { table => + table.batch(increments, results) + } match { + case Failure(ex) => + log.error(s"${ex.getMessage}") + case _ => + } + + assert(counts.length == results.length) + + for { + ((exactKey, eqWithValue), result) <- counts.zip(results) + } yield { + val eqWithResult = result match { + case r: Result => + for { + (eq, value) <- eqWithValue + } yield { + val interval = eq.tq.q + val cf = intervalsMap(interval) + val result = Option(r.getColumnLatestCell(cf.toString.getBytes, BytesUtilV1.toBytes(eq))).map { cell => + Bytes.toLong(CellUtil.cloneValue(cell)) + }.getOrElse(-1l) + eq -> result + } + case ex: Throwable => + log.error(s"${ex.getMessage}: $exactKey") + Nil + case _ => + log.error(s"result is null: $exactKey") + Nil + } + (exactKey, eqWithResult.toMap) + } + }.toMap + + override def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = { + withHBase(getTableName(policy)) { table => + table.delete { + for { + key <- keys + } yield { + new Delete(BytesUtilV1.toBytes(key)) + } + } + } match { + case Failure(ex) => + log.error(ex.getMessage) + case _ => + } + } + + override def get(policy: Counter, + queries: Seq[(ExactKeyTrait, Seq[core.ExactQualifier])]) + (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = { + + val tableName = getTableName(policy) + + val gets = { + for { + (key, eqs) <- queries + (cf, eqsGrouped) <- eqs.groupBy(eq => intervalsMap(eq.tq.q)) + } yield { +// println(s"$key $eqsGrouped") + val get = new GetRequest(tableName, BytesUtilV1.toBytes(key)) + get.family(cf.toString) + get.qualifiers(eqsGrouped.map(BytesUtilV1.toBytes).toArray) + (key, cf, get) + } + } + + withAsyncHBase[Seq[FetchedCounts]] { client => + val deferreds: Seq[Deferred[FetchedCounts]] = { + for { + (key, cf, get) <- gets + } yield { + client.get(get).addCallback { new Callback[FetchedCounts, util.ArrayList[KeyValue]] { + override def call(kvs: util.ArrayList[KeyValue]): FetchedCounts = { + val qualifierWithCounts = { + for { + kv <- kvs + eq = BytesUtilV1.toExactQualifier(kv.qualifier()) + } yield { + eq -> Bytes.toLong(kv.value()) + } + }.toMap + FetchedCounts(key, qualifierWithCounts) + } + }} + } + } + Deferred.group(deferreds).addCallback { new Callback[Seq[FetchedCounts], util.ArrayList[FetchedCounts]] { + override def call(arg: util.ArrayList[FetchedCounts]): Seq[FetchedCounts] = arg + }} + } + } + + override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] = { + val results: Array[Object] = Array.fill(keys.size)(null) + + val puts = keys.map { key => + val put = new Put(BytesUtilV1.toBytes(key)) + put.addColumn(blobCF, blobColumn, key.itemId.getBytes) + } + + withHBase(getTableName(policy)) { table => + table.batch(puts, results) + } match { + case Failure(ex) => + log.error(s"${ex.getMessage}") + case _ => + } + + for { + (result, key) <- results.zip(keys) + } yield { + Option(result).map(_ => true).getOrElse { + log.error(s"fail to insert blob value: $key") + false + } + } + } + + override def getBlobValue(policy: Counter, blobId: String): Option[String] = { + lazy val messageForLog = s"${policy.service}.${policy.action}.$blobId" + + policy.itemType match { + case ItemType.BLOB => + withHBase(getTableName(policy)) { table => + val rowKey = BytesUtilV1.toBytes(ExactKey(policy.id, policy.version, policy.itemType, blobId)) + val get = new Get(rowKey) + get.addColumn(blobCF, blobColumn) + table.get(get) + } match { + case Success(result) => + Option(result).filter(!_.isEmpty).map { rst => + Bytes.toString(rst.getValue(blobCF, blobColumn)) + } + case Failure(ex) => + throw ex + } + case _ => + log.warn(s"is not blob type counter. $messageForLog") + throw new Exception(s"is not blob type counter. $messageForLog") + } + } + + override def prepare(policy: Counter): Unit = { + // create hbase table + policy.hbaseTable.foreach { table => + if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)) { + hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table, + ColumnFamily.values.map(_.toString).toList, 1) + hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, ColumnFamily.SHORT.toString, policy.ttl) + policy.dailyTtl.foreach { i => + hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, ColumnFamily.LONG.toString, i * 24 * 60 * 60) + } + } + } + } + + override def destroy(policy: Counter): Unit = { + + } + + override def ready(policy: Counter): Boolean = { + policy.hbaseTable.map { table => + hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table) + }.getOrElse(true) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageHBase.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageHBase.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageHBase.scala new file mode 100644 index 0000000..d6d87d3 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageHBase.scala @@ -0,0 +1,326 @@ +package org.apache.s2graph.counter.core.v1 + +import com.typesafe.config.Config +import org.apache.hadoop.hbase.CellUtil +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.filter.{ColumnRangeFilter, FilterList} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.core +import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap +import org.apache.s2graph.counter.core._ +import org.apache.s2graph.counter.helper.{Management, WithHBase} +import org.apache.s2graph.counter.models.Counter +import org.apache.s2graph.counter.models.Counter.ItemType +import org.slf4j.LoggerFactory +import scala.collection.JavaConversions._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + + +class ExactStorageHBase(config: Config) extends ExactStorage { + import ExactStorageHBase._ + + private val log = LoggerFactory.getLogger(getClass) + + lazy val s2config = new S2CounterConfig(config) + + private[counter] val withHBase = new WithHBase(config) + private[counter] val hbaseManagement = new Management(config) + + + + private def getTableName(policy: Counter): String = { + policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME) + } + + override def get(policy: Counter, + items: Seq[String], + timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)]) + (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] = { + lazy val messageForLog = s"${policy.service}.${policy.action} $items $timeRange" + + val keys = { + for { + item <- items + } yield { + ExactKey(policy, item, checkItemType = true) + } + } + + val gets = { + for { + key <- keys + } yield { + val get = new Get(BytesUtilV1.toBytes(key)) + timeRange.map(t => intervalsMap(t._1.q)).distinct.foreach { cf => + get.addFamily(cf.toString.getBytes) + } + get.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE, { + for { + (from, to) <- timeRange + } yield { + new ColumnRangeFilter( + BytesUtilV1.toBytes(from), true, + BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false) + } + })) + } + } + + // println(s"$messageForLog $gets") + Future { + withHBase(getTableName(policy)) { table => + for { + (rst, key) <- table.get(gets).zip(keys) if !rst.isEmpty + } yield { + val qualifierWithCounts = { + for { + cell <- rst.listCells() + eq = BytesUtilV1.toExactQualifier(CellUtil.cloneQualifier(cell)) + } yield { + eq -> Bytes.toLong(CellUtil.cloneValue(cell)) + } + }.toMap + FetchedCounts(key, qualifierWithCounts) + } + } match { + case Success(rst) => rst + case Failure(ex) => + log.error(s"$ex: $messageForLog") + Nil + } + } + } + + override def get(policy: Counter, + items: Seq[String], + timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)], + dimQuery: Map[String, Set[String]]) + (implicit ec: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = { + get(policy, items, timeRange).map { fetchedLs => + for { + FetchedCounts(exactKey, qualifierWithCountMap) <- fetchedLs + } yield { + val intervalWithCountMap = qualifierWithCountMap + .filter { case (eq, v) => eq.checkDimensionEquality(dimQuery) } + .groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) } + FetchedCountsGrouped(exactKey, intervalWithCountMap) + } + } + } + + override def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = { + // increment mutation to hbase + val increments = { + for { + (exactKey, values) <- counts + inc = new Increment(BytesUtilV1.toBytes(exactKey)) + } yield { + for { + (eq, value) <- values + } { + inc.addColumn(intervalsMap.apply(eq.tq.q).toString.getBytes, BytesUtilV1.toBytes(eq), value) + } + // add column by dimension + inc + } + } + + val results: Array[Object] = Array.fill(increments.size)(null) + + withHBase(getTableName(policy)) { table => + table.batch(increments, results) + } match { + case Failure(ex) => + log.error(s"${ex.getMessage}") + case _ => + } + + assert(counts.length == results.length) + + for { + ((exactKey, eqWithValue), result) <- counts.zip(results) + } yield { + val eqWithResult = result match { + case r: Result => + for { + (eq, value) <- eqWithValue + } yield { + val interval = eq.tq.q + val cf = intervalsMap(interval) + val result = Option(r.getColumnLatestCell(cf.toString.getBytes, BytesUtilV1.toBytes(eq))).map { cell => + Bytes.toLong(CellUtil.cloneValue(cell)) + }.getOrElse(-1l) + eq -> result + } + case ex: Throwable => + log.error(s"${ex.getMessage}: $exactKey") + Nil + case _ => + log.error(s"result is null: $exactKey") + Nil + } + (exactKey, eqWithResult.toMap) + } + }.toMap + + override def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = { + withHBase(getTableName(policy)) { table => + table.delete { + for { + key <- keys + } yield { + new Delete(BytesUtilV1.toBytes(key)) + } + } + } match { + case Failure(ex) => + log.error(ex.getMessage) + case _ => + } + } + + override def get(policy: Counter, + queries: Seq[(ExactKeyTrait, Seq[core.ExactQualifier])]) + (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] = { + lazy val messageForLog = s"${policy.service}.${policy.action} $queries" + + val gets = { + for { + (key, eqs) <- queries + } yield { + // println(s"$key $eqsGrouped") + val get = new Get(BytesUtilV1.toBytes(key)) + + for { + eq <- eqs + } { + val cf = intervalsMap(eq.tq.q) + get.addColumn(cf.toString.getBytes, BytesUtilV1.toBytes(eq)) + } + get + } + } + + Future { + withHBase(getTableName(policy)) { table => + for { + (rst, key) <- table.get(gets).zip(queries.map(_._1)) if !rst.isEmpty + } yield { + val qualifierWithCounts = { + for { + cell <- rst.listCells() + eq = BytesUtilV1.toExactQualifier(CellUtil.cloneQualifier(cell)) + } yield { + eq -> Bytes.toLong(CellUtil.cloneValue(cell)) + } + }.toMap + FetchedCounts(key, qualifierWithCounts) + } + } match { + case Success(rst) => rst.toSeq + case Failure(ex) => + log.error(s"$ex: $messageForLog") + Nil + } + } + } + + override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] = { + val results: Array[Object] = Array.fill(keys.size)(null) + + val puts = keys.map { key => + val put = new Put(BytesUtilV1.toBytes(key)) + put.addColumn(blobCF, blobColumn, key.itemId.getBytes) + } + + withHBase(getTableName(policy)) { table => + table.batch(puts, results) + } match { + case Failure(ex) => + log.error(s"${ex.getMessage}") + case _ => + } + + for { + (result, key) <- results.zip(keys) + } yield { + Option(result).map(_ => true).getOrElse { + log.error(s"fail to insert blob value: $key") + false + } + } + } + + override def getBlobValue(policy: Counter, blobId: String): Option[String] = { + lazy val messageForLog = s"${policy.service}.${policy.action}.$blobId" + + policy.itemType match { + case ItemType.BLOB => + withHBase(getTableName(policy)) { table => + val rowKey = BytesUtilV1.toBytes(ExactKey(policy.id, policy.version, policy.itemType, blobId)) + val get = new Get(rowKey) + get.addColumn(blobCF, blobColumn) + table.get(get) + } match { + case Success(result) => + Option(result).filter(!_.isEmpty).map { rst => + Bytes.toString(rst.getValue(blobCF, blobColumn)) + } + case Failure(ex) => + throw ex + } + case _ => + log.warn(s"is not blob type counter. $messageForLog") + throw new Exception(s"is not blob type counter. $messageForLog") + } + } + + override def prepare(policy: Counter): Unit = { + // create hbase table + policy.hbaseTable.foreach { table => + + if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)) { + hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table, + ColumnFamily.values.map(_.toString).toList, 1) + hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, ColumnFamily.SHORT.toString, policy.ttl) + policy.dailyTtl.foreach { i => + hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, ColumnFamily.LONG.toString, i * 24 * 60 * 60) + } + } + } + } + + override def destroy(policy: Counter): Unit = { + + } + + override def ready(policy: Counter): Boolean = { + policy.hbaseTable.map { table => + hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table) + }.getOrElse(true) + } +} + +object ExactStorageHBase { + import core.TimedQualifier.IntervalUnit._ + + object ColumnFamily extends Enumeration { + type ColumnFamily = Value + + val SHORT = Value("s") + val LONG = Value("l") + } + + val blobCF = ColumnFamily.LONG.toString.getBytes + val blobColumn = "b".getBytes + + val intervalsMap = Map( + MINUTELY -> ColumnFamily.SHORT, + HOURLY -> ColumnFamily.SHORT, + DAILY -> ColumnFamily.LONG, + MONTHLY -> ColumnFamily.LONG, + TOTAL -> ColumnFamily.LONG + ) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/RankingStorageRedis.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/RankingStorageRedis.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/RankingStorageRedis.scala new file mode 100644 index 0000000..15ff380 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/RankingStorageRedis.scala @@ -0,0 +1,185 @@ +package org.apache.s2graph.counter.core.v1 + +import java.lang + +import com.typesafe.config.Config +import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap +import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit +import org.apache.s2graph.counter.core.{RankingResult, RankingKey, RankingStorage} +import org.apache.s2graph.counter.helper.WithRedis +import org.apache.s2graph.counter.models.{Counter, CounterModel} +import org.slf4j.LoggerFactory +import redis.clients.jedis.Pipeline +import scala.collection.JavaConversions._ +import scala.util.{Failure, Success} + +class RankingStorageRedis(config: Config) extends RankingStorage { + private[counter] val log = LoggerFactory.getLogger(this.getClass) + private[counter] val withRedis = new WithRedis(config) + + val counterModel = new CounterModel(config) + + val TOTAL = "_total_" + + /** + * ex1) + * dimension = "age.32" + * ex2) + * dimension = "age.gender.32.m" + * + */ + private def makeBucket(rankingKey: RankingKey): String = { + val policyId = rankingKey.policyId + val q = rankingKey.eq.tq.q + val ts = rankingKey.eq.tq.ts + val dimension = rankingKey.eq.dimension + if (dimension.nonEmpty) { + s"$policyId.$q.$ts.$dimension" + } + else { + s"$policyId.$q.$ts" + } + } + + override def getTopK(rankingKey: RankingKey, k: Int): Option[RankingResult] = { + val bucket = makeBucket(rankingKey) + withRedis.doBlockWithKey(bucket) { jedis => + jedis.zrevrangeByScoreWithScores(bucket, "+inf", "-inf", 0, k + 1).toSeq.map(t => (t.getElement, t.getScore)) + } match { + case Success(values) => + if (values.nonEmpty) { +// println(values) + Some(RankingResult(values.find(_._1 == TOTAL).map(_._2).getOrElse(-1d), values.filter(_._1 != TOTAL).take(k))) + } + else { + None + } + case Failure(ex) => + log.error(s"fail to get top k($ex). $rankingKey") + None + } + } + + private def getTTL(policyId: Int, intervalUnit: IntervalUnit.IntervalUnit): Option[Int] = { + counterModel.findById(policyId).flatMap { policy => + intervalUnit match { + case IntervalUnit.MINUTELY => Some(policy.ttl) + case IntervalUnit.HOURLY => Some(policy.ttl) + // default daily ttl 31 day + case IntervalUnit.DAILY => Some(policy.dailyTtl.getOrElse(31) * 24 * 3600) + case IntervalUnit.MONTHLY => policy.dailyTtl + case IntervalUnit.TOTAL => policy.dailyTtl + } + } + } + + override def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = { + // update ranking by score + val bucket = makeBucket(key) + withRedis.doBlockWithKey(bucket) { jedis => + val pipeline = jedis.pipelined() + updateItem(pipeline, bucket, key, value, k) + pipeline.sync() + } match { + case Failure(ex) => + log.error(s"fail to update $key $value: $ex") + case _ => + } + } + + private def updateItem(pipeline: Pipeline, bucket: String, key: RankingKey, value: RankingValueMap, k: Int): Unit = { + val topSeq = value.map { case (item, rv) => + // jedis client accept only java's double + item -> rv.score.asInstanceOf[lang.Double] + }.toSeq.sortBy(_._2).takeRight(k) + pipeline.zadd(bucket, topSeq.toMap[String, lang.Double]) + pipeline.zincrby(bucket, value.mapValues(_.increment).values.sum, TOTAL) + pipeline.zremrangeByRank(bucket, 0, -(k + 1)) + // if ttl defined, set expire time to bucket + getTTL(key.policyId, key.eq.tq.q).foreach { ttl => + pipeline.expire(bucket, ttl) + } + } + + override def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit = { + values.map { case (key, value) => + (makeBucket(key), key, value) + }.groupBy { case (bucket, key, value) => + withRedis.getBucketIdx(bucket) + }.foreach { case (idx, seq) => + withRedis.doBlockWithIndex(idx) { jedis => + val pipeline = jedis.pipelined() + for { + (bucket, key, value) <- seq + } { + updateItem(pipeline, bucket, key, value, k) + } + pipeline.sync() + } match { + case Failure(ex) => + log.error(s"fail to update multi $idx: $ex") + case _ => + } + } + } + + override def delete(key: RankingKey): Unit = { + val bucket = makeBucket(key) + withRedis.doBlockWithKey(bucket) { jedis => + jedis.del(bucket) + } match { + case Success(deleted) => + log.info(s"success to delete $key") + case Failure(ex) => + log.error(s"fail to delete $key") + } + } + + override def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, RankingResult)] = { + keys.map { key => + (makeBucket(key), key) + }.groupBy { case (bucket, key) => + withRedis.getBucketIdx(bucket) + }.toSeq.par.flatMap { case (idx, seq) => + withRedis.doBlockWithIndex(idx) { jedis => + val pipeline = jedis.pipelined() + val keyWithRespLs = { + for { + (bucket, rankingKey) <- seq + } yield { + (rankingKey, pipeline.zrevrangeByScoreWithScores(bucket, "+inf", "-inf", 0, k + 1)) + } + } + pipeline.sync() + for { + (rankingKey, resp) <- keyWithRespLs + } yield { + (rankingKey, resp.get().toSeq.map { t => (t.getElement, t.getScore)}) + } + } match { + case Success(keyWithValues) => + for { + (rankingKey, values) <- keyWithValues + } yield { + val result = RankingResult(values.find(_._1 == TOTAL).map(_._2).getOrElse(-1d), values.filter(_._1 != TOTAL).take(k)) + (rankingKey, result) + } + case Failure(ex) => + Nil + } + } + }.seq + + override def prepare(policy: Counter): Unit = { + // do nothing + } + + override def destroy(policy: Counter): Unit = { + + } + + override def ready(policy: Counter): Boolean = { + // always return true + true + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/BytesUtilV2.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/BytesUtilV2.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/BytesUtilV2.scala new file mode 100644 index 0000000..a37506c --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/BytesUtilV2.scala @@ -0,0 +1,88 @@ +package org.apache.s2graph.counter.core.v2 + +import org.apache.hadoop.hbase.util._ +import org.apache.s2graph.counter +import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit +import org.apache.s2graph.counter.core.{TimedQualifier, ExactQualifier, ExactKeyTrait, BytesUtil} +import org.apache.s2graph.counter.models.Counter.ItemType +import org.apache.s2graph.counter.util.Hashes +import scala.collection.mutable.ArrayBuffer + +object BytesUtilV2 extends BytesUtil { + // ExactKey: [hash(1b)][version(1b)][policy(4b)][item(variable)] + val BUCKET_BYTE_SIZE = Bytes.SIZEOF_BYTE + val VERSION_BYTE_SIZE = Bytes.SIZEOF_BYTE + val POLICY_ID_SIZE = Bytes.SIZEOF_INT + + val INTERVAL_SIZE = Bytes.SIZEOF_BYTE + val TIMESTAMP_SIZE = Bytes.SIZEOF_LONG + val TIMED_QUALIFIER_SIZE = INTERVAL_SIZE + TIMESTAMP_SIZE + + override def getRowKeyPrefix(id: Int): Array[Byte] = { + Array(counter.VERSION_2) ++ Bytes.toBytes(id) + } + + override def toBytes(key: ExactKeyTrait): Array[Byte] = { + val buff = new ArrayBuffer[Byte] + // hash byte + buff ++= Bytes.toBytes(Hashes.murmur3(key.itemKey)).take(BUCKET_BYTE_SIZE) + + // row key prefix + // version + policy id + buff ++= getRowKeyPrefix(key.policyId) + + buff ++= { + key.itemType match { + case ItemType.INT => Bytes.toBytes(key.itemKey.toInt) + case ItemType.LONG => Bytes.toBytes(key.itemKey.toLong) + case ItemType.STRING | ItemType.BLOB => Bytes.toBytes(key.itemKey) + } + } + buff.toArray + } + + override def toBytes(eq: ExactQualifier): Array[Byte] = { + val len = eq.dimKeyValues.map { case (k, v) => k.length + 2 + v.length + 2 }.sum + val pbr = new SimplePositionedMutableByteRange(len) + for { + v <- ExactQualifier.makeSortedDimension(eq.dimKeyValues) + } { + OrderedBytes.encodeString(pbr, v, Order.ASCENDING) + } + toBytes(eq.tq) ++ pbr.getBytes + } + + override def toBytes(tq: TimedQualifier): Array[Byte] = { + val pbr = new SimplePositionedMutableByteRange(INTERVAL_SIZE + 2 + TIMESTAMP_SIZE + 1) + OrderedBytes.encodeString(pbr, tq.q.toString, Order.ASCENDING) + OrderedBytes.encodeInt64(pbr, tq.ts, Order.DESCENDING) + pbr.getBytes + } + + private def decodeString(pbr: PositionedByteRange): Stream[String] = { + if (pbr.getRemaining > 0) { + Stream.cons(OrderedBytes.decodeString(pbr), decodeString(pbr)) + } + else { + Stream.empty + } + } + + override def toExactQualifier(bytes: Array[Byte]): ExactQualifier = { + val pbr = new SimplePositionedByteRange(bytes) + ExactQualifier(toTimedQualifier(pbr), { + val seqStr = decodeString(pbr).toSeq + val (keys, values) = seqStr.splitAt(seqStr.length / 2) + keys.zip(values).toMap + }) + } + + override def toTimedQualifier(bytes: Array[Byte]): TimedQualifier = { + val pbr = new SimplePositionedByteRange(bytes) + toTimedQualifier(pbr) + } + + def toTimedQualifier(pbr: PositionedByteRange): TimedQualifier = { + TimedQualifier(IntervalUnit.withName(OrderedBytes.decodeString(pbr)), OrderedBytes.decodeInt64(pbr)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala new file mode 100644 index 0000000..522cf18 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala @@ -0,0 +1,343 @@ +package org.apache.s2graph.counter.core.v2 + +import com.typesafe.config.Config +import org.apache.http.HttpStatus +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.core +import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap +import org.apache.s2graph.counter.core._ +import org.apache.s2graph.counter.models.Counter +import org.apache.s2graph.counter.util.CartesianProduct +import org.slf4j.LoggerFactory +import play.api.libs.json._ +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + +object ExactStorageGraph { + case class RespGraph(success: Boolean, result: Long) + implicit val respGraphFormat = Json.format[RespGraph] + + // using play-ws without play app + private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder() + private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build) +} + +case class ExactStorageGraph(config: Config) extends ExactStorage { + private val log = LoggerFactory.getLogger(this.getClass) + private val s2config = new S2CounterConfig(config) + + private val SERVICE_NAME = "s2counter" + private val COLUMN_NAME = "bucket" + private val labelPostfix = "_counts" + + val s2graphUrl = s2config.GRAPH_URL + val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL + val graphOp = new GraphOperation(config) + + import ExactStorageGraph._ + + override def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = { + import scala.concurrent.ExecutionContext.Implicits.global + + val (keyWithEq, reqJsLs) = toIncrementCountRequests(policy, counts).unzip(x => ((x._1, x._2), x._3)) + + val future = wsClient.url(s"$s2graphUrl/graphs/edges/incrementCount").post(Json.toJson(reqJsLs)).map { resp => + resp.status match { + case HttpStatus.SC_OK => + val respSeq = resp.json.as[Seq[RespGraph]] + + val keyWithEqResult = { + for { + ((key, eq), RespGraph(success, result)) <- keyWithEq.zip(respSeq) + } yield { + (key, (eq, result)) + } + }.groupBy(_._1).mapValues{ seq => seq.map(_._2).toMap } + keyWithEqResult + case _ => + throw new RuntimeException(s"update failed: $policy $counts") + } + } + Await.result(future, 10 second) + } + + def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = { + + } + + private def toIncrementCountRequests(policy: Counter, + counts: Seq[(ExactKeyTrait, ExactValueMap)]) + : Seq[(ExactKeyTrait, core.ExactQualifier, JsValue)] = { + val labelName = policy.action + labelPostfix + val timestamp = System.currentTimeMillis() + for { + (exactKey, values) <- counts + (eq, value) <- values + } yield { + val from = exactKey.itemKey + val to = eq.dimension + val json = Json.obj( + "timestamp" -> timestamp, + "operation" -> "incrementCount", + "from" -> from, + "to" -> to, + "label" -> labelName, + "props" -> Json.obj( + "_count" -> value, + "time_unit" -> eq.tq.q.toString, + "time_value" -> eq.tq.ts + ) + ) + (exactKey, eq, json) + } + } + + override def get(policy: Counter, + items: Seq[String], + timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)], + dimQuery: Map[String, Set[String]]) + (implicit ex: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = { + val labelName = policy.action + labelPostfix + val label = Label.findByName(labelName).get +// val label = labelModel.findByName(labelName).get + + val ids = Json.toJson(items) + + val dimensions = { + for { + values <- CartesianProduct(dimQuery.values.map(ss => ss.toList).toList) + } yield { + dimQuery.keys.zip(values).toMap + } + } + + val stepJsLs = { + for { + (tqFrom, tqTo) <- timeRange + dimension <- dimensions + } yield { + val eqFrom = core.ExactQualifier(tqFrom, dimension) + val eqTo = core.ExactQualifier(tqTo, dimension) + val intervalJs = + s""" + |{ + | "from": { + | "_to": "${eqFrom.dimension}", + | "time_unit": "${eqFrom.tq.q}", + | "time_value": ${eqFrom.tq.ts} + | }, + | "to": { + | "_to": "${eqTo.dimension}", + | "time_unit": "${eqTo.tq.q}", + | "time_value": ${eqTo.tq.ts + 1} + | } + |} + """.stripMargin + val stepJs = + s""" + |{ + | "direction": "out", + | "limit": -1, + | "duplicate": "raw", + | "label": "$labelName", + | "interval": $intervalJs + |} + """.stripMargin + stepJs + } + } + + val reqJsStr = + s""" + |{ + | "srcVertices": [ + | {"serviceName": "${policy.service}", "columnName": "${label.srcColumnName}", "ids": $ids} + | ], + | "steps": [ + | { + | "step": [ + | ${stepJsLs.mkString(",")} + | ] + | } + | ] + |} + """.stripMargin + + val reqJs = Json.parse(reqJsStr) +// log.warn(s"query: ${reqJs.toString()}") + + wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(reqJs).map { resp => + resp.status match { + case HttpStatus.SC_OK => + val respJs = resp.json +// println(respJs) + val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { result => +// println(s"result: $result") + resultToExactKeyValues(policy, result) + }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap.groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) }) + for { + (k, v) <- keyWithValues.toSeq + } yield { + FetchedCountsGrouped(k, v) + } + case n: Int => + log.warn(s"getEdges status($n): $reqJsStr") +// println(s"getEdges status($n): $reqJsStr") + Nil + } + } + } + + private def resultToExactKeyValues(policy: Counter, result: JsValue): (ExactKeyTrait, (core.ExactQualifier, Long)) = { + val from = result \ "from" match { + case s: JsString => s.as[String] + case n: JsNumber => n.as[Long].toString + case x: JsValue => throw new RuntimeException(s"$x's type must be string or number") + } + val dimension = (result \ "to").as[String] + val props = result \ "props" + val count = (props \ "_count").as[Long] + val timeUnit = (props \ "time_unit").as[String] + val timeValue = (props \ "time_value").as[Long] + (ExactKey(policy, from, checkItemType = true), (core.ExactQualifier(core.TimedQualifier(timeUnit, timeValue), dimension), count)) + } + + private def getInner(policy: Counter, key: ExactKeyTrait, eqs: Seq[core.ExactQualifier]) + (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = { + val labelName = policy.action + labelPostfix + + Label.findByName(labelName) match { + case Some(label) => + + val src = Json.obj("serviceName" -> policy.service, "columnName" -> label.srcColumnName, "id" -> key.itemKey) + val step = { + val stepLs = { + for { + eq <- eqs + } yield { + val from = Json.obj("_to" -> eq.dimension, "time_unit" -> eq.tq.q.toString, "time_value" -> eq.tq.ts) + val to = Json.obj("_to" -> eq.dimension, "time_unit" -> eq.tq.q.toString, "time_value" -> eq.tq.ts) + val interval = Json.obj("from" -> from, "to" -> to) + Json.obj("limit" -> 1, "label" -> labelName, "interval" -> interval) + } + } + Json.obj("step" -> stepLs) + } + val query = Json.obj("srcVertices" -> Json.arr(src), "steps" -> Json.arr(step)) + // println(s"query: ${query.toString()}") + + wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(query).map { resp => + resp.status match { + case HttpStatus.SC_OK => + val respJs = resp.json + val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { result => + resultToExactKeyValues(policy, result) + }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap) + for { + (key, eqWithValues) <- keyWithValues.toSeq + } yield { + FetchedCounts(key, eqWithValues) + } + case _ => + Nil + } + } + case None => + Future.successful(Nil) + } + } + + // for query exact qualifier + override def get(policy: Counter, queries: Seq[(ExactKeyTrait, Seq[core.ExactQualifier])]) + (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = { + val futures = { + for { + (key, eqs) <- queries + } yield { +// println(s"$key $eqs") + getInner(policy, key, eqs) + } + } + Future.sequence(futures).map(_.flatten) + } + + override def getBlobValue(policy: Counter, blobId: String): Option[String] = { + throw new RuntimeException("unsupported getBlobValue operation") + } + + override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] = { + throw new RuntimeException("unsupported insertBlobValue operation") + } + + private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean = { + val action = policy.action + val counterLabelName = action + labelPostfix + + Label.findByName(counterLabelName, useCache).nonEmpty + } + + override def prepare(policy: Counter): Unit = { + val service = policy.service + val action = policy.action + + val graphLabel = Label.findByName(action) + if (graphLabel.isEmpty) { + throw new Exception(s"label not found. $service.$action") + } + + if (!existsLabel(policy, useCache = false)) { + val label = Label.findByName(action, useCache = false).get + + val counterLabelName = action + labelPostfix + val defaultJson = + s""" + |{ + | "label": "$counterLabelName", + | "srcServiceName": "$service", + | "srcColumnName": "${label.tgtColumnName}", + | "srcColumnType": "${label.tgtColumnType}", + | "tgtServiceName": "$SERVICE_NAME", + | "tgtColumnName": "$COLUMN_NAME", + | "tgtColumnType": "string", + | "indices": [ + | {"name": "time", "propNames": ["_to", "time_unit", "time_value"]} + | ], + | "props": [ + | {"name": "time_unit", "dataType": "string", "defaultValue": ""}, + | {"name": "time_value", "dataType": "long", "defaultValue": 0} + | ], + | "hTableName": "${policy.hbaseTable.get}" + |} + """.stripMargin + val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match { + case Some(ttl) => + Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> Json.toJson(ttl)) + case None => + Json.parse(defaultJson) + } + + graphOp.createLabel(json) + } + } + + override def destroy(policy: Counter): Unit = { + val action = policy.action + + if (existsLabel(policy, useCache = false)) { + val counterLabelName = action + labelPostfix + + graphOp.deleteLabel(counterLabelName) + } + } + + override def ready(policy: Counter): Boolean = { + existsLabel(policy) + } + + // for range query + override def get(policy: Counter, items: Seq[String], timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)]) + (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] = { + throw new NotImplementedError("Not implemented") + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala new file mode 100644 index 0000000..d1bb2ef --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala @@ -0,0 +1,48 @@ +package org.apache.s2graph.counter.core.v2 + +import com.typesafe.config.Config +import org.apache.http.HttpStatus +import org.apache.s2graph.counter.config.S2CounterConfig +import org.slf4j.LoggerFactory +import play.api.libs.json.{JsObject, JsValue, Json} +import scala.concurrent.Await +import scala.concurrent.duration._ + +class GraphOperation(config: Config) { + // using play-ws without play app + private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder() + private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build) + private val s2config = new S2CounterConfig(config) + val s2graphUrl = s2config.GRAPH_URL + private[counter] val log = LoggerFactory.getLogger(this.getClass) + + import scala.concurrent.ExecutionContext.Implicits.global + + def createLabel(json: JsValue): Boolean = { + // fix counter label's schemaVersion + val newJson = json.as[JsObject] ++ Json.obj("schemaVersion" -> "v2") + val future = wsClient.url(s"$s2graphUrl/graphs/createLabel").post(newJson).map { resp => + resp.status match { + case HttpStatus.SC_OK => + true + case _ => + throw new RuntimeException(s"failed createLabel. errCode: ${resp.status} body: ${resp.body} query: $json") + } + } + + Await.result(future, 10 second) + } + + def deleteLabel(label: String): Boolean = { + val future = wsClient.url(s"$s2graphUrl/graphs/deleteLabel/$label").put("").map { resp => + resp.status match { + case HttpStatus.SC_OK => + true + case _ => + throw new RuntimeException(s"failed deleteLabel. errCode: ${resp.status} body: ${resp.body}") + } + } + + Await.result(future, 10 second) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala new file mode 100644 index 0000000..add4b04 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala @@ -0,0 +1,395 @@ +package org.apache.s2graph.counter.core.v2 + +import com.typesafe.config.Config +import org.apache.commons.httpclient.HttpStatus +import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap +import org.apache.s2graph.counter.core.{RankingResult, RankingKey, RankingStorage} +import org.apache.s2graph.counter.models.{Counter, CounterModel} +import org.apache.s2graph.counter.util.{CollectionCacheConfig, CollectionCache} +import org.slf4j.LoggerFactory +import play.api.libs.json.{JsObject, JsString, JsValue, Json} +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.hashing.MurmurHash3 + +object RankingStorageGraph { + // using play-ws without play app + private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder() + private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build) +} + +class RankingStorageGraph(config: Config) extends RankingStorage { + import RankingStorageGraph._ + + private[counter] val log = LoggerFactory.getLogger(this.getClass) + private val s2config = new S2CounterConfig(config) + + private val BUCKET_SHARD_COUNT = 53 + private val SERVICE_NAME = "s2counter" + private val BUCKET_COLUMN_NAME = "bucket" + private val counterModel = new CounterModel(config) + private val labelPostfix = "_topK" + + val s2graphUrl = s2config.GRAPH_URL + val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL + + val prepareCache = new CollectionCache[Option[Boolean]](CollectionCacheConfig(10000, 600)) + val graphOp = new GraphOperation(config) + import scala.concurrent.ExecutionContext.Implicits.global + + private def makeBucketKey(rankingKey: RankingKey): String = { + val eq = rankingKey.eq + val tq = eq.tq + s"${tq.q}.${tq.ts}.${eq.dimension}" + } + + // "", "age.32", "age.gender.32.M" + private def makeBucketShardKey(shardIdx: Int, rankingKey: RankingKey): String = { + s"$shardIdx.${makeBucketKey(rankingKey)}" + } + + /** + * indexProps: ["time_unit", "time_value", "score"] + */ + override def getTopK(key: RankingKey, k: Int): Option[RankingResult] = { + getTopK(Seq(key), k).headOption.map(_._2) + } + + override def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, RankingResult)] = { + val futures = for { + key <- keys + } yield { + getEdges(key).map { edges => + key -> RankingResult(0d, toWithScoreLs(edges).take(k)) + } + } + + Await.result(Future.sequence(futures), 10 seconds) + } + + override def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = { + update(Seq((key, value)), k) + } + + override def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit = { + val futures = { + for { + (key, value) <- values + } yield { + // prepare dimension bucket edge + checkAndPrepareDimensionBucket(key) + + val future = getEdges(key, "raw").flatMap { edges => + val prevRankingSeq = toWithScoreLs(edges) + val prevRankingMap: Map[String, Double] = prevRankingSeq.groupBy(_._1).map(_._2.sortBy(-_._2).head) + val currentRankingMap: Map[String, Double] = value.mapValues(_.score) + val mergedRankingSeq = (prevRankingMap ++ currentRankingMap).toSeq.sortBy(-_._2).take(k) + val mergedRankingMap = mergedRankingSeq.toMap + + val bucketRankingSeq = mergedRankingSeq.groupBy { case (itemId, score) => + // 0-index + GraphUtil.transformHash(MurmurHash3.stringHash(itemId)) % BUCKET_SHARD_COUNT + }.map { case (shardIdx, groupedRanking) => + shardIdx -> groupedRanking.filter { case (itemId, _) => currentRankingMap.contains(itemId) } + }.toSeq + + insertBulk(key, bucketRankingSeq).flatMap { _ => + val duplicatedItems = prevRankingMap.filterKeys(s => currentRankingMap.contains(s)) + val cutoffItems = prevRankingMap.filterKeys(s => !mergedRankingMap.contains(s)) + val deleteItems = duplicatedItems ++ cutoffItems + + val keyWithEdgesLs = prevRankingSeq.map(_._1).zip(edges) + val deleteEdges = keyWithEdgesLs.filter{ case (s, _) => deleteItems.contains(s) }.map(_._2) + + deleteAll(deleteEdges) + } + } + + future + } + } + + Await.result(Future.sequence(futures), 10 seconds) + } + + private def toWithScoreLs(edges: List[JsValue]): List[(String, Double)] = { + for { + edgeJson <- edges + to = (edgeJson \ "to").as[JsValue] + score = (edgeJson \ "score").as[JsValue].toString().toDouble + } yield { + val toValue = to match { + case s: JsString => s.as[String] + case _ => to.toString() + } + toValue -> score + } + } + + private def insertBulk(key: RankingKey, newRankingSeq: Seq[(Int, Seq[(String, Double)])]): Future[Boolean] = { + val labelName = counterModel.findById(key.policyId).get.action + labelPostfix + val timestamp: Long = System.currentTimeMillis + val payload = Json.toJson { + for { + (shardIdx, rankingSeq) <- newRankingSeq + (itemId, score) <- rankingSeq + } yield { + val srcId = makeBucketShardKey(shardIdx, key) + Json.obj( + "timestamp" -> timestamp, + "from" -> srcId, + "to" -> itemId, + "label" -> labelName, + "props" -> Json.obj( + "time_unit" -> key.eq.tq.q.toString, + "time_value" -> key.eq.tq.ts, + "date_time" -> key.eq.tq.dateTime, + "score" -> score + ) + ) + } + } + + wsClient.url(s"$s2graphUrl/graphs/edges/insertBulk").post(payload).map { resp => + resp.status match { + case HttpStatus.SC_OK => + true + case _ => + throw new RuntimeException(s"failed insertBulk. errCode: ${resp.status}, body: ${resp.body}, query: $payload") + } + } + } + + private def deleteAll(edges: List[JsValue]): Future[Boolean] = { + // /graphs/edges/delete + val futures = { + for { + groupedEdges <- edges.grouped(50) + } yield { + val payload = Json.toJson(groupedEdges) + wsClient.url(s"$s2graphUrl/graphs/edges/delete").post(payload).map { resp => + resp.status match { + case HttpStatus.SC_OK => + true + case _ => + log.error(s"failed delete. errCode: ${resp.status}, body: ${resp.body}, query: $payload") + false + } + } + } + }.toSeq + + Future.sequence(futures).map { seq => + seq.forall(x => x) + } + } + + /** select and delete */ + override def delete(key: RankingKey): Unit = { + val future = getEdges(key).flatMap { edges => + deleteAll(edges) + } + Await.result(future, 10 second) + } + + private def getEdges(key: RankingKey, duplicate: String="first"): Future[List[JsValue]] = { + val labelName = counterModel.findById(key.policyId).get.action + labelPostfix + + val ids = { + (0 until BUCKET_SHARD_COUNT).map { shardIdx => + s""""${makeBucketShardKey(shardIdx, key)}"""" + } + }.mkString(",") + + val strJs = + s""" + |{ + | "srcVertices": [ + | { + | "serviceName": "$SERVICE_NAME", + | "columnName": "$BUCKET_COLUMN_NAME", + | "ids": [$ids] + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$labelName", + | "duplicate": "$duplicate", + | "direction": "out", + | "offset": 0, + | "limit": -1, + | "interval": { + | "from": {"time_unit": "${key.eq.tq.q.toString}", "time_value": ${key.eq.tq.ts}}, + | "to": {"time_unit": "${key.eq.tq.q.toString}", "time_value": ${key.eq.tq.ts}} + | }, + | "scoring": {"score": 1} + | } + | ] + | } + | ] + |} + """.stripMargin + log.debug(strJs) + + val payload = Json.parse(strJs) + wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(payload).map { resp => + resp.status match { + case HttpStatus.SC_OK => + (resp.json \ "results").asOpt[List[JsValue]].getOrElse(Nil) + case _ => + throw new RuntimeException(s"failed getEdges. errCode: ${resp.status}, body: ${resp.body}, query: $payload") + } + } + } + + private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean = { + val action = policy.action + val counterLabelName = action + labelPostfix + + Label.findByName(counterLabelName, useCache).nonEmpty + } + + private def checkAndPrepareDimensionBucket(rankingKey: RankingKey): Boolean = { + val dimension = rankingKey.eq.dimension + val bucketKey = makeBucketKey(rankingKey) + val labelName = "s2counter_topK_bucket" + + val prepared = prepareCache.withCache(s"$dimension:$bucketKey") { + val checkReqJs = Json.arr( + Json.obj( + "label" -> labelName, + "direction" -> "out", + "from" -> dimension, + "to" -> makeBucketShardKey(BUCKET_SHARD_COUNT - 1, rankingKey) + ) + ) + + val future = wsClient.url(s"$s2graphReadOnlyUrl/graphs/checkEdges").post(checkReqJs).map { resp => + resp.status match { + case HttpStatus.SC_OK => + val checkRespJs = resp.json + if (checkRespJs.as[Seq[JsValue]].nonEmpty) { + true + } else { + false + } + case _ => + // throw exception + throw new Exception(s"failed checkEdges. ${resp.body} ${resp.status}") + } + }.flatMap { + case true => Future.successful(Some(true)) + case false => + val insertReqJsLs = { + for { + i <- 0 until BUCKET_SHARD_COUNT + } yield { + Json.obj( + "timestamp" -> rankingKey.eq.tq.ts, + "from" -> dimension, + "to" -> makeBucketShardKey(i, rankingKey), + "label" -> labelName, + "props" -> Json.obj( + "time_unit" -> rankingKey.eq.tq.q.toString, + "date_time" -> rankingKey.eq.tq.dateTime + ) + ) + } + } + wsClient.url(s"$s2graphUrl/graphs/edges/insert").post(Json.toJson(insertReqJsLs)).map { resp => + resp.status match { + case HttpStatus.SC_OK => + Some(true) + case _ => + // throw exception + throw new Exception(s"failed insertEdges. ${resp.body} ${resp.status}") + } + } + }.recover { + case e: Exception => + None + } + + Await.result(future, 10 second) + } + prepared.getOrElse(false) + } + + override def prepare(policy: Counter): Unit = { + val service = policy.service + val action = policy.action + + val graphLabel = { + policy.rateActionId match { + case Some(rateId) => + counterModel.findById(rateId, useCache = false).flatMap { ratePolicy => + Label.findByName(ratePolicy.action) + } + case None => + Label.findByName(action) + } + } + if (graphLabel.isEmpty) { + throw new Exception(s"label not found. $service.$action") + } + + if (!existsLabel(policy, useCache = false)) { + // find input label to specify target column + val inputLabelName = policy.rateActionId.flatMap { id => + counterModel.findById(id, useCache = false).map(_.action) + }.getOrElse(action) + val label = graphLabel.get + + val counterLabelName = action + labelPostfix + val defaultJson = + s""" + |{ + | "label": "$counterLabelName", + | "srcServiceName": "$SERVICE_NAME", + | "srcColumnName": "$BUCKET_COLUMN_NAME", + | "srcColumnType": "string", + | "tgtServiceName": "$service", + | "tgtColumnName": "${label.tgtColumnName}", + | "tgtColumnType": "${label.tgtColumnType}", + | "indices": [ + | {"name": "time", "propNames": ["time_unit", "time_value", "score"]} + | ], + | "props": [ + | {"name": "time_unit", "dataType": "string", "defaultValue": ""}, + | {"name": "time_value", "dataType": "long", "defaultValue": 0}, + | {"name": "date_time", "dataType": "long", "defaultValue": 0}, + | {"name": "score", "dataType": "float", "defaultValue": 0.0} + | ], + | "hTableName": "${policy.hbaseTable.get}" + |} + """.stripMargin + val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match { + case Some(ttl) => + Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> Json.toJson(ttl)) + case None => + Json.parse(defaultJson) + } + + graphOp.createLabel(json) + } + } + + override def destroy(policy: Counter): Unit = { + val action = policy.action + + if (existsLabel(policy, useCache = false)) { + val counterLabelName = action + labelPostfix + + graphOp.deleteLabel(counterLabelName) + } + } + + override def ready(policy: Counter): Boolean = { + existsLabel(policy) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/DecayFormula.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/DecayFormula.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/DecayFormula.scala new file mode 100644 index 0000000..4e5da90 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/DecayFormula.scala @@ -0,0 +1,6 @@ +package org.apache.s2graph.counter.decay + +trait DecayFormula { + def apply(value: Double, millis: Long): Double + def apply(value: Double, seconds: Int): Double +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/ExpDecayFormula.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/ExpDecayFormula.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/ExpDecayFormula.scala new file mode 100644 index 0000000..1075421 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/ExpDecayFormula.scala @@ -0,0 +1,25 @@ +package org.apache.s2graph.counter.decay + +case class ExpDecayFormula(halfLifeInMillis: Double) extends DecayFormula { + val decayRate = - Math.log(2) / halfLifeInMillis + + override def apply(value: Double, millis: Long): Double = { + value * Math.pow(Math.E, decayRate * millis) + } + + override def apply(value: Double, seconds: Int): Double = { + apply(value, seconds * 1000L) + } +} + +object ExpDecayFormula { + @deprecated("do not use. just experimental", "0.14") + def byWindowTime(windowInMillis: Long, pct: Double): ExpDecayFormula = { + val halfLife = windowInMillis * Math.log(0.5) / Math.log(pct) + ExpDecayFormula(halfLife) + } + + def byMeanLifeTime(millis: Long): ExpDecayFormula = { + ExpDecayFormula(millis * Math.log(2)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala new file mode 100644 index 0000000..cae2245 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala @@ -0,0 +1,113 @@ +package org.apache.s2graph.counter.helper + +import com.typesafe.config.Config +import org.apache +import org.apache.s2graph.core.Graph +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.counter +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.core.{RankingCounter, ExactCounter} +import org.apache.s2graph.counter.core.v1.{RankingStorageRedis, ExactStorageAsyncHBase} +import org.apache.s2graph.counter.core.v2.{RankingStorageGraph, ExactStorageGraph, GraphOperation} +import org.apache.s2graph.counter.models.{Counter, CounterModel} +import play.api.libs.json.Json + +import scala.util.Try + +class CounterAdmin(config: Config) { + val s2config = new S2CounterConfig(config) + val counterModel = new CounterModel(config) + val graphOp = new GraphOperation(config) + val s2graph = new Graph(config)(scala.concurrent.ExecutionContext.global) + val storageManagement = new org.apache.s2graph.core.Management(s2graph) + + def setupCounterOnGraph(): Unit = { + // create s2counter service + val service = "s2counter" + storageManagement.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"$service-${config.getString("phase")}", 1, None, "gz") + // create bucket label + val label = "s2counter_topK_bucket" + if (Label.findByName(label, useCache = false).isEmpty) { + val strJs = + s""" + |{ + | "label": "$label", + | "srcServiceName": "s2counter", + | "srcColumnName": "dimension", + | "srcColumnType": "string", + | "tgtServiceName": "s2counter", + | "tgtColumnName": "bucket", + | "tgtColumnType": "string", + | "indices": [ + | {"name": "time", "propNames": ["time_unit", "date_time"]} + | ], + | "props": [ + | {"name": "time_unit", "dataType": "string", "defaultValue": ""}, + | {"name": "date_time", "dataType": "long", "defaultValue": 0} + | ], + | "hTableName": "s2counter_60", + | "hTableTTL": 5184000 + |} + """.stripMargin + graphOp.createLabel(Json.parse(strJs)) + } + } + + def createCounter(policy: Counter): Unit = { + val newPolicy = policy.copy(hbaseTable = Some(makeHTableName(policy))) + prepareStorage(newPolicy) + counterModel.createServiceAction(newPolicy) + } + + def deleteCounter(service: String, action: String): Option[Try[Unit]] = { + for { + policy <- counterModel.findByServiceAction(service, action, useCache = false) + } yield { + Try { + exactCounter(policy).destroy(policy) + if (policy.useRank) { + rankingCounter(policy).destroy(policy) + } + counterModel.deleteServiceAction(policy) + } + } + } + + def prepareStorage(policy: Counter): Unit = { + if (policy.rateActionId.isEmpty) { + // if defined rate action, do not use exact counter + exactCounter(policy).prepare(policy) + } + if (policy.useRank) { + rankingCounter(policy).prepare(policy) + } + } + + def prepareStorage(policy: Counter, version: Byte): Unit = { + // this function to prepare storage by version parameter instead of policy.version + prepareStorage(policy.copy(version = version)) + } + + private val exactCounterMap = Map( + counter.VERSION_1 -> new ExactCounter(config, new ExactStorageAsyncHBase(config)), + counter.VERSION_2 -> new ExactCounter(config, new ExactStorageGraph(config)) + ) + private val rankingCounterMap = Map( + apache.s2graph.counter.VERSION_1 -> new RankingCounter(config, new RankingStorageRedis(config)), + apache.s2graph.counter.VERSION_2 -> new RankingCounter(config, new RankingStorageGraph(config)) + ) + + private val tablePrefixMap = Map ( + apache.s2graph.counter.VERSION_1 -> "s2counter", + apache.s2graph.counter.VERSION_2 -> "s2counter_v2" + ) + + def exactCounter(version: Byte): ExactCounter = exactCounterMap(version) + def exactCounter(policy: Counter): ExactCounter = exactCounter(policy.version) + def rankingCounter(version: Byte): RankingCounter = rankingCounterMap(version) + def rankingCounter(policy: Counter): RankingCounter = rankingCounter(policy.version) + + def makeHTableName(policy: Counter): String = { + Seq(tablePrefixMap(policy.version), policy.service, policy.ttl) ++ policy.dailyTtl mkString "_" + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/DistributedScanner.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/DistributedScanner.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/DistributedScanner.scala new file mode 100644 index 0000000..c06cabe --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/DistributedScanner.scala @@ -0,0 +1,70 @@ +package org.apache.s2graph.counter.helper + +import java.util +import java.util.Comparator + +import com.google.common.primitives.SignedBytes +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.util.Bytes + +object DistributedScanner { + val BUCKET_BYTE_SIZE = Bytes.SIZEOF_BYTE + + def getRealRowKey(result: Result): Array[Byte] = { + result.getRow.drop(BUCKET_BYTE_SIZE) + } + } + +class DistributedScanner(table: Table, scan: Scan) extends AbstractClientScanner { + import DistributedScanner._ + + private val BYTE_MAX = BigInt(256) + + private[helper] val scanners = { + for { + i <- 0 until BYTE_MAX.pow(BUCKET_BYTE_SIZE).toInt + } yield { + val bucketBytes: Array[Byte] = Bytes.toBytes(i).takeRight(BUCKET_BYTE_SIZE) + val newScan = new Scan(scan).setStartRow(bucketBytes ++ scan.getStartRow).setStopRow(bucketBytes ++ scan.getStopRow) + table.getScanner(newScan) + } + } + + val resultCache = new util.TreeMap[Result, java.util.Iterator[Result]](new Comparator[Result] { + val comparator = SignedBytes.lexicographicalComparator() + override def compare(o1: Result, o2: Result): Int = { + comparator.compare(getRealRowKey(o1), getRealRowKey(o2)) + } + }) + + lazy val initialized = { + val iterators = scanners.map(_.iterator()).filter(_.hasNext) + iterators.foreach { it => + resultCache.put(it.next(), it) + } + iterators.nonEmpty + } + + override def next(): Result = { + if (initialized) { + Option(resultCache.pollFirstEntry()).map { entry => + val it = entry.getValue + if (it.hasNext) { + // fill cache + resultCache.put(it.next(), it) + } + entry.getKey + }.orNull + } else { + null + } + } + + override def close(): Unit = { + for { + scanner <- scanners + } { + scanner.close() + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/HashShardingJedis.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/HashShardingJedis.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/HashShardingJedis.scala new file mode 100644 index 0000000..f970c34 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/HashShardingJedis.scala @@ -0,0 +1,153 @@ +package org.apache.s2graph.counter.helper + +import com.typesafe.config.Config +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.util.Hashes +import org.slf4j.LoggerFactory +import redis.clients.jedis.exceptions.JedisException +import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} + +class HashShardingJedis(config: Config) { + lazy val s2config = new S2CounterConfig(config) + + private val log = LoggerFactory.getLogger(getClass) + + val poolConfig = new JedisPoolConfig() + poolConfig.setMaxTotal(150) + poolConfig.setMaxIdle(50) + poolConfig.setMaxWaitMillis(200) + + val jedisPools = s2config.REDIS_INSTANCES.map { case (host, port) => + new JedisPool(poolConfig, host, port) + } + val jedisPoolSize = jedisPools.size + + def getJedisPool(idx: Int): JedisPool = { + if(idx >= jedisPoolSize) + null + else + jedisPools(idx) + } + + def getJedisPoolWithBucketname2(bucket: String): JedisPool = { + val hashedValue = Hashes.murmur3(bucket) + val idx = hashedValue % jedisPoolSize + getJedisPool(idx) + } + + def getJedisPoolWithBucketname(bucket: String): (JedisPool, JedisPool) = { + val hashedValue = Hashes.murmur3(bucket) + val idx = hashedValue % jedisPoolSize + val secondaryIdx = if (jedisPoolSize <= 1) { + throw new Exception("too small sharding pool <= 1") + } else { + val newIdx = (hashedValue / jedisPoolSize) % (jedisPoolSize -1) + if(newIdx < idx) { + newIdx + } else { + newIdx +1 + } + } + (getJedisPool(idx), getJedisPool(secondaryIdx)) + } + + def doBlockWithJedisInstace(f : Jedis => Any, fallBack : => Any, jedis : Jedis) = { + try { + f(jedis) + } + catch { + case e:JedisException => { + fallBack + } + } + } + + def doBlockWithBucketName(f : Jedis => Any, fallBack : => Any, bucket : String) = { + // Logger.debug(s"start jedis do block") + //val (jedis_pool1, jedis_pool2) = getJedisPoolWithBucketname(bucket) + val jedis_pool1= getJedisPoolWithBucketname2(bucket) + // if(jedis_pool1 != null && jedis_pool2 != null) { + if(jedis_pool1 != null) { + var jedis1: Jedis = null + // var jedis2: Jedis = null + try { + jedis1 = jedis_pool1.getResource() + // jedis2 = jedis_pool2.getResource() + log.info(s">> Jedis Pool Active Num : ${jedis_pool1.getNumActive}") + + /* val f1 = Future(f(jedis1)) + val f2 = Future(f(jedis2)) + + val mixedFuture = Future.sequence(List(f1,f2)) */ + + val r1 = f(jedis1) + //val r2 = f(jedis2) + + r1 + } + catch { + case e:JedisException => { + // Logger.debug(s"following exception catched") + // Logger.debug(s"${e}") + jedis_pool1.returnBrokenResource(jedis1) + // jedis_pool2.returnBrokenResource(jedis2) + + jedis1 = null + // jedis2 = null + fallBack + } + } + finally { + if (jedis1 != null) jedis_pool1.returnResource(jedis1) + // if (jedis2 != null) jedis_pool2.returnResource(jedis2) + } + } + else{ + // Logger.debug(s"fallback executed") + fallBack + } + } + + def doBlockWithKey[T](key: String)(f: Jedis => T)(fallBack: => T) = { + // Logger.debug(s"start jedis do block") + val (jedis_pool1, jedis_pool2) = getJedisPoolWithBucketname(key) + if(jedis_pool1 != null && jedis_pool2 != null) { + var jedis1: Jedis = null + var jedis2: Jedis = null + try { + jedis1 = jedis_pool1.getResource() + jedis2 = jedis_pool2.getResource() + + /* val f1 = Future(f(jedis1)) + val f2 = Future(f(jedis2)) + + val mixedFuture = Future.sequence(List(f1,f2)) */ + + val r1 = f(jedis1) + //val r2 = f(jedis2) + + r1 + } + catch { + case e:JedisException => { + // Logger.debug(s"following exception catched") + // Logger.debug(s"${e}") + jedis_pool1.returnBrokenResource(jedis1) + jedis_pool2.returnBrokenResource(jedis2) + + jedis1 = null + jedis2 = null + fallBack + } + } + finally { + if (jedis1 != null) jedis_pool1.returnResource(jedis1) + if (jedis2 != null) jedis_pool2.returnResource(jedis2) + } + } + else{ + // Logger.debug(s"fallback executed") + fallBack + } + } + }
