http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/Management.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/Management.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/Management.scala new file mode 100644 index 0000000..c507075 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/Management.scala @@ -0,0 +1,143 @@ +package org.apache.s2graph.counter.helper + +import com.typesafe.config.Config +import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability} +import org.apache.hadoop.hbase.io.compress.Compression +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding +import org.apache.hadoop.hbase.regionserver.BloomType +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} +import org.slf4j.LoggerFactory +import redis.clients.jedis.ScanParams + +import scala.collection.JavaConversions._ +import scala.util.Random + +class Management(config: Config) { + val withRedis = new HashShardingJedis(config) + + val log = LoggerFactory.getLogger(this.getClass) + + def describe(zkAddr: String, tableName: String) = { + val admin = getAdmin(zkAddr) + val table = admin.getTableDescriptor(TableName.valueOf(tableName)) + + table.getColumnFamilies.foreach { cf => + println(s"columnFamily: ${cf.getNameAsString}") + cf.getValues.foreach { case (k, v) => + println(s"${Bytes.toString(k.get())} ${Bytes.toString(v.get())}") + } + } + } + + def setTTL(zkAddr: String, tableName: String, cfName: String, ttl: Int) = { + val admin = getAdmin(zkAddr) + val tableNameObj = TableName.valueOf(tableName) + val table = admin.getTableDescriptor(tableNameObj) + + val cf = table.getFamily(cfName.getBytes) + cf.setTimeToLive(ttl) + + admin.modifyColumn(tableNameObj, cf) + } + + def getAdmin(zkAddr: String): Admin = { + val conf = HBaseConfiguration.create() + conf.set("hbase.zookeeper.quorum", zkAddr) + val conn = ConnectionFactory.createConnection(conf) + conn.getAdmin + } + + def tableExists(zkAddr: String, tableName: String): Boolean = { + getAdmin(zkAddr).tableExists(TableName.valueOf(tableName)) + } + + def createTable(zkAddr: String, tableName: String, cfs: List[String], regionMultiplier: Int) = { + log.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier") + val admin = getAdmin(zkAddr) + val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier + try { + val desc = new HTableDescriptor(TableName.valueOf(tableName)) + desc.setDurability(Durability.ASYNC_WAL) + for (cf <- cfs) { + val columnDesc = new HColumnDescriptor(cf) + .setCompressionType(Compression.Algorithm.LZ4) + .setBloomFilterType(BloomType.ROW) + .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) + .setMaxVersions(1) + .setMinVersions(0) + .setBlocksize(32768) + .setBlockCacheEnabled(true) + desc.addFamily(columnDesc) + } + + if (regionCount <= 1) admin.createTable(desc) + else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) + } catch { + case e: Throwable => + log.error(s"$zkAddr, $tableName failed with $e", e) + throw e + } + } + + // we only use murmur hash to distribute row key. + private def getStartKey(regionCount: Int) = { + Bytes.toBytes(Int.MaxValue / regionCount) + } + + private def getEndKey(regionCount: Int) = { + Bytes.toBytes(Int.MaxValue / regionCount * (regionCount - 1)) + } + + case class RedisScanIterator(scanParams: ScanParams = new ScanParams().count(100)) extends Iterator[String] { + val nextCursorId: collection.mutable.Map[Int, String] = collection.mutable.Map.empty[Int, String] + var innerIterator: Iterator[String] = _ + + for { + i <- 0 until withRedis.jedisPoolSize + } { + nextCursorId.put(i, "0") + } + + def callScan(): Unit = { + if (nextCursorId.nonEmpty) { + // println(s"callScan: idx: $nextIdx, cursor: $nextCursorId") + val idx = Random.shuffle(nextCursorId.keys).head + val cursorId = nextCursorId(idx) + val pool = withRedis.getJedisPool(idx) + val conn = pool.getResource + try { + val result = conn.scan(cursorId, scanParams) + result.getStringCursor match { + case "0" => + log.debug(s"end scan: idx: $idx, cursor: $cursorId") + nextCursorId.remove(idx) + case x: String => + nextCursorId.put(idx, x) + } + innerIterator = result.getResult.toIterator + } finally { + pool.returnResource(conn) + } + } + else { + innerIterator = List.empty[String].toIterator + } + } + + // initialize + callScan() + + override def hasNext: Boolean = { + innerIterator.hasNext match { + case true => + true + case false => + callScan() + innerIterator.hasNext + } + } + + override def next(): String = innerIterator.next() + } + }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala new file mode 100644 index 0000000..108c47a --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala @@ -0,0 +1,98 @@ +package org.apache.s2graph.counter.helper + +import com.stumbleupon.async.{Callback, Deferred} +import com.typesafe.config.Config +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} +import org.apache.s2graph.counter.config.S2CounterConfig +import org.hbase.async.HBaseClient +import org.slf4j.LoggerFactory +import scala.concurrent.{Future, Promise} +import scala.util.Try + + +class WithHBase(config: Config) { + lazy val logger = LoggerFactory.getLogger(this.getClass) + lazy val s2config = new S2CounterConfig(config) + + lazy val zkQuorum = s2config.HBASE_ZOOKEEPER_QUORUM + lazy val defaultTableName = s2config.HBASE_TABLE_NAME + + logger.info(s"$zkQuorum, $defaultTableName") + + val hbaseConfig = HBaseConfiguration.create() + s2config.getConfigMap("hbase").foreach { case (k, v) => + hbaseConfig.set(k, v) + } + +// lazy val conn: HConnection = HConnectionManager.createConnection(hbaseConfig) + lazy val conn: Connection = ConnectionFactory.createConnection(hbaseConfig) + + val writeBufferSize = 1024 * 1024 * 2 // 2MB + +// def apply[T](op: Table => T): Try[T] = { +// Try { +// val table = conn.getTable(TableName.valueOf(defaultTableName)) +// // do not keep failed operation in writer buffer +// table.setWriteBufferSize(writeBufferSize) +// try { +// op(table) +// } catch { +// case e: Throwable => +// logger.error(s"Operation to table($defaultTableName) is failed: ${e.getMessage}") +// throw e +// } finally { +// table.close() +// } +// } +// } + + def apply[T](tableName: String)(op: Table => T): Try[T] = { + Try { + val table = conn.getTable(TableName.valueOf(tableName)) + // do not keep failed operation in writer buffer + table.setWriteBufferSize(writeBufferSize) + try { + op(table) + } catch { + case ex: Exception => + logger.error(s"$ex: Operation to table($tableName) is failed") + throw ex + } finally { + table.close() + } + } + } +} + +case class WithAsyncHBase(config: Config) { + lazy val logger = LoggerFactory.getLogger(this.getClass) + lazy val s2config = new S2CounterConfig(config) + + lazy val zkQuorum = s2config.HBASE_ZOOKEEPER_QUORUM + + val hbaseConfig = HBaseConfiguration.create() + s2config.getConfigMap("hbase").foreach { case (k, v) => + hbaseConfig.set(k, v) + } + +// lazy val conn: HConnection = HConnectionManager.createConnection(hbaseConfig) + lazy val client: HBaseClient = new HBaseClient(zkQuorum) + + val writeBufferSize = 1024 * 1024 * 2 // 2MB + + def apply[T](op: HBaseClient => Deferred[T]): Future[T] = { + val promise = Promise[T]() + + op(client).addCallback(new Callback[Unit, T] { + def call(arg: T): Unit = { + promise.success(arg) + } + }).addErrback(new Callback[Unit, Exception] { + def call(ex: Exception): Unit = { + promise.failure(ex) + } + }) + promise.future + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithRedis.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithRedis.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithRedis.scala new file mode 100644 index 0000000..a7b99c8 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithRedis.scala @@ -0,0 +1,59 @@ +package org.apache.s2graph.counter.helper + +import com.typesafe.config.Config +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.util.Hashes +import org.slf4j.LoggerFactory +import redis.clients.jedis.exceptions.JedisException +import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} + +import scala.util.Try + +class WithRedis(config: Config) { + lazy val s2config = new S2CounterConfig(config) + + private val log = LoggerFactory.getLogger(getClass) + + val poolConfig = new JedisPoolConfig() + poolConfig.setMaxTotal(150) + poolConfig.setMaxIdle(50) + poolConfig.setMaxWaitMillis(200) + + val jedisPools = s2config.REDIS_INSTANCES.map { case (host, port) => + new JedisPool(poolConfig, host, port) + } + + def getBucketIdx(key: String): Int = { + Hashes.murmur3(key) % jedisPools.size + } + + def doBlockWithIndex[T](idx: Int)(f: Jedis => T): Try[T] = { + Try { + val pool = jedisPools(idx) + + var jedis: Jedis = null + + try { + jedis = pool.getResource + + f(jedis) + } + catch { + case e: JedisException => + pool.returnBrokenResource(jedis) + + jedis = null + throw e + } + finally { + if (jedis != null) { + pool.returnResource(jedis) + } + } + } + } + + def doBlockWithKey[T](key: String)(f: Jedis => T): Try[T] = { + doBlockWithIndex(getBucketIdx(key))(f) + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/CachedDBModel.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/CachedDBModel.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/CachedDBModel.scala new file mode 100644 index 0000000..6e06caf --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/CachedDBModel.scala @@ -0,0 +1,11 @@ +package org.apache.s2graph.counter.models + +import org.apache.s2graph.counter.util.{CollectionCache, CollectionCacheConfig} +import scalikejdbc.AutoSession + +trait CachedDBModel[T] { + implicit val s = AutoSession + + val cacheConfig: CollectionCacheConfig + lazy val cache = new CollectionCache[Option[T]](cacheConfig) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/Counter.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/Counter.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/Counter.scala new file mode 100644 index 0000000..467e110 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/Counter.scala @@ -0,0 +1,210 @@ +package org.apache.s2graph.counter.models + +import com.typesafe.config.Config +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.util.{CollectionCache, CollectionCacheConfig} +import scalikejdbc._ + +case class Counter(id: Int, useFlag: Boolean, version: Byte, service: String, action: String, + itemType: Counter.ItemType.ItemType, autoComb: Boolean, dimension: String, + useProfile: Boolean, bucketImpId: Option[String], + useRank: Boolean, + ttl: Int, dailyTtl: Option[Int], hbaseTable: Option[String], + intervalUnit: Option[String], + rateActionId: Option[Int], rateBaseId: Option[Int], rateThreshold: Option[Int]) { + val intervals: Array[String] = intervalUnit.map(s => s.split(',')).getOrElse(Array("t", "M", "d", "H")) + val dimensionSp = if (dimension.isEmpty) Array.empty[String] else dimension.split(',').sorted + + val dimensionList: List[Array[String]] = { + autoComb match { + case true => + for { + i <- (0 to math.min(4, dimensionSp.length)).toList + combines <- dimensionSp.combinations(i) + } yield { + combines + } + case false => + dimensionSp isEmpty match { + case true => List(Array()) + case false => dimensionSp.toList.map(sp => sp.split('.')) + } + } + } + + val dimensionSet: Set[Set[String]] = { + for { + arr <- dimensionList + } yield { + arr.toSet + } + }.toSet + + val isRateCounter: Boolean = rateActionId.isDefined && rateBaseId.isDefined && rateActionId != rateBaseId + val isTrendCounter: Boolean = rateActionId.isDefined && rateBaseId.isDefined && rateActionId == rateBaseId +} + +object Counter extends SQLSyntaxSupport[Counter] { + object ItemType extends Enumeration { + type ItemType = Value + val INT, LONG, STRING, BLOB = Value + } + + def apply(c: SyntaxProvider[Counter])(rs: WrappedResultSet): Counter = apply(c.resultName)(rs) + def apply(r: ResultName[Counter])(rs: WrappedResultSet): Counter = { + lazy val itemType = Counter.ItemType(rs.int(r.itemType)) + Counter(rs.int(r.id), rs.boolean(r.useFlag), rs.byte(r.version), rs.string(r.service), rs.string(r.action), + itemType, rs.boolean(r.autoComb), rs.string(r.dimension), + rs.boolean(r.useProfile), rs.stringOpt(r.bucketImpId), + rs.boolean(r.useRank), + rs.int(r.ttl), rs.intOpt(r.dailyTtl), rs.stringOpt(r.hbaseTable), rs.stringOpt(r.intervalUnit), + rs.intOpt(r.rateActionId), rs.intOpt(r.rateBaseId), rs.intOpt(r.rateThreshold)) + } + + def apply(useFlag: Boolean, version: Byte, service: String, action: String, itemType: Counter.ItemType.ItemType, + autoComb: Boolean, dimension: String, useProfile: Boolean = false, bucketImpId: Option[String] = None, + useRank: Boolean = false, ttl: Int = 259200, dailyTtl: Option[Int] = None, + hbaseTable: Option[String] = None, intervalUnit: Option[String] = None, + rateActionId: Option[Int] = None, rateBaseId: Option[Int] = None, rateThreshold: Option[Int] = None): Counter = { + Counter(-1, useFlag, version, service, action, itemType, autoComb, dimension, + useProfile, bucketImpId, + useRank, ttl, dailyTtl, hbaseTable, + intervalUnit, rateActionId, rateBaseId, rateThreshold) + } + } + +class CounterModel(config: Config) extends CachedDBModel[Counter] { + private lazy val s2Config = new S2CounterConfig(config) + // enable negative cache + override val cacheConfig: CollectionCacheConfig = + new CollectionCacheConfig(s2Config.CACHE_MAX_SIZE, s2Config.CACHE_TTL_SECONDS, + negativeCache = true, s2Config.CACHE_NEGATIVE_TTL_SECONDS) + + val c = Counter.syntax("c") + val r = c.result + + val multiCache = new CollectionCache[Seq[Counter]](cacheConfig) + + def findById(id: Int, useCache: Boolean = true): Option[Counter] = { + lazy val sql = withSQL { + selectFrom(Counter as c).where.eq(c.id, id).and.eq(c.useFlag, 1) + }.map(Counter(c)) + + if (useCache) { + cache.withCache(s"_id:$id") { + sql.single().apply() + } + } else { + sql.single().apply() + } + } + + def findByServiceAction(service: String, action: String, useCache: Boolean = true): Option[Counter] = { + lazy val sql = withSQL { + selectFrom(Counter as c).where.eq(c.service, service).and.eq(c.action, action).and.eq(c.useFlag, 1) + }.map(Counter(c)) + + if (useCache) { + cache.withCache(s"$service.$action") { + sql.single().apply() + } + } + else { + sql.single().apply() + } + } + + def findByRateActionId(rateActionId: Int, useCache: Boolean = true): Seq[Counter] = { + lazy val sql = withSQL { + selectFrom(Counter as c).where.eq(c.rateActionId, rateActionId).and.ne(c.rateBaseId, rateActionId).and.eq(c.useFlag, 1) + }.map(Counter(c)) + + if (useCache) { + multiCache.withCache(s"_rate_action_id.$rateActionId") { + sql.list().apply() + } + } else { + sql.list().apply() + } + } + + def findByRateBaseId(rateBaseId: Int, useCache: Boolean = true): Seq[Counter] = { + lazy val sql = withSQL { + selectFrom(Counter as c).where.eq(c.rateBaseId, rateBaseId).and.ne(c.rateActionId, rateBaseId).and.eq(c.useFlag, 1) + }.map(Counter(c)) + + if (useCache) { + multiCache.withCache(s"_rate_base_id.$rateBaseId") { + sql.list().apply() + } + } else { + sql.list().apply() + } + } + + def findByTrendActionId(trendActionId: Int, useCache: Boolean = true): Seq[Counter] = { + lazy val sql = withSQL { + selectFrom(Counter as c).where.eq(c.rateActionId, trendActionId).and.eq(c.rateBaseId, trendActionId).and.eq(c.useFlag, 1) + }.map(Counter(c)) + + if (useCache) { + multiCache.withCache(s"_trend_action_id.$trendActionId") { + sql.list().apply() + } + } else { + sql.list().apply() + } + } + + def createServiceAction(policy: Counter): Unit = { + withSQL { + val c = Counter.column + insert.into(Counter).namedValues( + c.useFlag -> policy.useFlag, + c.version -> policy.version, + c.service -> policy.service, + c.action -> policy.action, + c.itemType -> policy.itemType.id, + c.autoComb -> policy.autoComb, + c.dimension -> policy.dimension, + c.useProfile -> policy.useProfile, + c.bucketImpId -> policy.bucketImpId, + c.useRank -> policy.useRank, + c.ttl -> policy.ttl, + c.dailyTtl -> policy.dailyTtl, + c.hbaseTable -> policy.hbaseTable, + c.intervalUnit -> policy.intervalUnit, + c.rateActionId -> policy.rateActionId, + c.rateBaseId -> policy.rateBaseId, + c.rateThreshold -> policy.rateThreshold + ) + }.update().apply() + } + + def updateServiceAction(policy: Counter): Unit = { + withSQL { + val c = Counter.column + update(Counter).set( + c.autoComb -> policy.autoComb, + c.dimension -> policy.dimension, + c.useProfile -> policy.useProfile, + c.bucketImpId -> policy.bucketImpId, + c.useRank -> policy.useRank, + c.intervalUnit -> policy.intervalUnit, + c.rateActionId -> policy.rateActionId, + c.rateBaseId -> policy.rateBaseId, + c.rateThreshold -> policy.rateThreshold + ).where.eq(c.id, policy.id) + }.update().apply() + } + + def deleteServiceAction(policy: Counter): Unit = { + withSQL { + val c = Counter.column + update(Counter).set( + c.action -> s"deleted_${System.currentTimeMillis()}_${policy.action}", + c.useFlag -> false + ).where.eq(c.id, policy.id) + }.update().apply() + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/DBModel.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/DBModel.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/DBModel.scala new file mode 100644 index 0000000..1757a7f --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/DBModel.scala @@ -0,0 +1,24 @@ +package org.apache.s2graph.counter.models + +import com.typesafe.config.Config +import org.apache.s2graph.counter.config.S2CounterConfig +import scalikejdbc._ + +object DBModel { + private var initialized = false + + def initialize(config: Config): Unit = { + if (!initialized) { + this synchronized { + if (!initialized) { + val s2Config = new S2CounterConfig(config) + Class.forName(s2Config.DB_DEFAULT_DRIVER) + val settings = ConnectionPoolSettings(initialSize = 0, maxSize = 10, connectionTimeoutMillis = 5000L, validationQuery = "select 1;") + + ConnectionPool.singleton(s2Config.DB_DEFAULT_URL, s2Config.DB_DEFAULT_USER, s2Config.DB_DEFAULT_PASSWORD, settings) + initialized = true + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala new file mode 100644 index 0000000..01bff0c --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala @@ -0,0 +1,8 @@ +package org.apache.s2graph + +package object counter { + val VERSION_1: Byte = 1 + val VERSION_2: Byte = 2 + + case class MethodNotSupportedException(message: String, cause: Throwable = null) extends Exception(message, cause) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CartesianProduct.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CartesianProduct.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CartesianProduct.scala new file mode 100644 index 0000000..b1712fa --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CartesianProduct.scala @@ -0,0 +1,8 @@ +package org.apache.s2graph.counter.util + +object CartesianProduct { + def apply[T](xss: List[List[T]]): List[List[T]] = xss match { + case Nil => List(Nil) + case h :: t => for(xh <- h; xt <- apply(t)) yield xh :: xt + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CollectionCache.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CollectionCache.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CollectionCache.scala new file mode 100644 index 0000000..74b5238 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CollectionCache.scala @@ -0,0 +1,66 @@ +package org.apache.s2graph.counter.util + +import java.net.InetAddress +import java.util.concurrent.TimeUnit + +import com.google.common.cache.{Cache, CacheBuilder} +import org.slf4j.LoggerFactory + +import scala.concurrent.{ExecutionContext, Future} +import scala.language.{postfixOps, reflectiveCalls} + +case class CollectionCacheConfig(maxSize: Int, ttl: Int, negativeCache: Boolean = false, negativeTTL: Int = 600) + +class CollectionCache[C <: { def nonEmpty: Boolean; def isEmpty: Boolean } ](config: CollectionCacheConfig) { + private val cache: Cache[String, C] = CacheBuilder.newBuilder() + .expireAfterWrite(config.ttl, TimeUnit.SECONDS) + .maximumSize(config.maxSize) + .build[String, C]() + +// private lazy val cache = new SynchronizedLruMap[String, (C, Int)](config.maxSize) + private lazy val className = this.getClass.getSimpleName + + private lazy val log = LoggerFactory.getLogger(this.getClass) + val localHostname = InetAddress.getLocalHost.getHostName + + def size = cache.size + val maxSize = config.maxSize + + // cache statistics + def getStatsString: String = { + s"$localHostname ${cache.stats().toString}" + } + + def withCache(key: String)(op: => C): C = { + Option(cache.getIfPresent(key)) match { + case Some(r) => r + case None => + val r = op + if (r.nonEmpty || config.negativeCache) { + cache.put(key, r) + } + r + } + } + + def withCacheAsync(key: String)(op: => Future[C])(implicit ec: ExecutionContext): Future[C] = { + Option(cache.getIfPresent(key)) match { + case Some(r) => Future.successful(r) + case None => + op.map { r => + if (r.nonEmpty || config.negativeCache) { + cache.put(key, r) + } + r + } + } + } + + def purgeKey(key: String) = { + cache.invalidate(key) + } + + def contains(key: String): Boolean = { + Option(cache.getIfPresent(key)).nonEmpty + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/FunctionParser.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/FunctionParser.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/FunctionParser.scala new file mode 100644 index 0000000..e65910f --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/FunctionParser.scala @@ -0,0 +1,21 @@ +package org.apache.s2graph.counter.util + +object FunctionParser { + val funcRe = """([a-zA-Z_]+)(\((\d+)?\))?""".r + + def apply(str: String): Option[(String, String)] = { + str match { + case funcRe(funcName, funcParam, funcArg) => + funcName match { + case x: String => + Some((funcName, funcArg match { + case x: String => funcArg + case null => "" + })) + case null => None + } + case _ => + None + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Hashes.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Hashes.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Hashes.scala new file mode 100644 index 0000000..634b723 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Hashes.scala @@ -0,0 +1,21 @@ +package org.apache.s2graph.counter.util + +import org.apache.hadoop.hbase.util.Bytes + +import scala.util.hashing.MurmurHash3 + +object Hashes { + def sha1(s: String): String = { + val md = java.security.MessageDigest.getInstance("SHA-1") + Bytes.toHex(md.digest(s.getBytes("UTF-8"))) + } + + private def positiveHash(h: Int): Int = { + if (h < 0) -1 * (h + 1) else h + } + + def murmur3(s: String): Int = { + val hash = MurmurHash3.stringHash(s) + positiveHash(hash) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/ReduceMapValue.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/ReduceMapValue.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/ReduceMapValue.scala new file mode 100644 index 0000000..037813b --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/ReduceMapValue.scala @@ -0,0 +1,9 @@ +package org.apache.s2graph.counter.util + +class ReduceMapValue[T, U](op: (U, U) => U, default: U) { + def apply(m1: Map[T, U], m2: Map[T, U]): Map[T, U] = { + m1 ++ m2.map { case (k, v) => + k -> op(m1.getOrElse(k, default), v) + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Retry.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Retry.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Retry.scala new file mode 100644 index 0000000..f49a231 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Retry.scala @@ -0,0 +1,44 @@ +package org.apache.s2graph.counter.util + +import scala.annotation.tailrec +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success, Try} + +object Retry { + @tailrec + def apply[T](n: Int, withSleep: Boolean = true, tryCount: Int = 0)(fn: => T): T = { + Try { fn } match { + case Success(x) => x + case Failure(e) if e.isInstanceOf[RetryStopException] => throw e.getCause + case _ if n > 1 => + // backoff + if (withSleep) Thread.sleep(tryCount * 1000) + apply(n - 1, withSleep, tryCount + 1)(fn) + case Failure(e) => throw e + } + } +} + +object RetryAsync { + def apply[T](n: Int, withSleep: Boolean = true, tryCount: Int = 0)(fn: => Future[T])(implicit ex: ExecutionContext): Future[T] = { + val promise = Promise[T]() + fn onComplete { + case Success(x) => promise.success(x) + case Failure(e) if e.isInstanceOf[RetryStopException] => promise.failure(e.getCause) + case _ if n > 1 => + // backoff + if (withSleep) Thread.sleep(tryCount * 1000) + apply(n - 1, withSleep, tryCount + 1)(fn) + case Failure(e) => promise.failure(e) + } + promise.future + } +} + +class RetryStopException(message: String, cause: Throwable) + extends Exception(message, cause) { + + def this(message: String) = this(message, null) + + def this(cause: Throwable) = this(cause.toString, cause) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/SplitBytes.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/SplitBytes.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/SplitBytes.scala new file mode 100644 index 0000000..34f4cf4 --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/SplitBytes.scala @@ -0,0 +1,21 @@ +package org.apache.s2graph.counter.util + +object SplitBytes { + def apply(bytes: Array[Byte], sizes: Seq[Int]): Seq[Array[Byte]] = { + if (sizes.sum > bytes.length) { + throw new Exception(s"sizes.sum bigger than bytes.length ${sizes.sum} > ${bytes.length}} ") + } + + var position = 0 + val rtn = { + for { + size <- sizes + } yield { + val slice = bytes.slice(position, position + size) + position += size + slice + } + } + rtn ++ Seq(bytes.drop(position)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/UnitConverter.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/UnitConverter.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/UnitConverter.scala new file mode 100644 index 0000000..af6bc0c --- /dev/null +++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/UnitConverter.scala @@ -0,0 +1,25 @@ +package org.apache.s2graph.counter.util + +object UnitConverter { + def toMillis(ts: Int): Long = { + ts * 1000L + } + + def toMillis(ts: Long): Long = { + if (ts <= Int.MaxValue) { + ts * 1000 + } else { + ts + } + } + + def toMillis(s: String): Long = { + toMillis(s.toLong) + } + + def toHours(ts: Long): Long = { + toMillis(ts) / HOUR_MILLIS * HOUR_MILLIS + } + + val HOUR_MILLIS = 60 * 60 * 1000 +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/config/ConfigFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/config/ConfigFunctions.scala b/s2counter_core/src/main/scala/s2/config/ConfigFunctions.scala deleted file mode 100644 index 20d07ce..0000000 --- a/s2counter_core/src/main/scala/s2/config/ConfigFunctions.scala +++ /dev/null @@ -1,33 +0,0 @@ -package s2.config - -import com.typesafe.config.Config - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 3. 2.. - */ -abstract class ConfigFunctions(conf: Config) { - def getOrElse[T: ClassTag](key: String, default: T): T = { - val ret = if (conf.hasPath(key)) (default match { - case _: String => conf.getString(key) - case _: Int | _: Integer => conf.getInt(key) - case _: Float | _: Double => conf.getDouble(key) - case _: Boolean => conf.getBoolean(key) - case _ => default - }).asInstanceOf[T] - else default - println(s"${this.getClass.getName}: $key -> $ret") - ret - } - - def getConfigMap(path: String): Map[String, String] = { - conf.getConfig(path).entrySet().map { entry => - val key = s"$path.${entry.getKey}" - val value = conf.getString(key) - println(s"${this.getClass.getName}: $key -> $value") - key -> value - }.toMap - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/config/S2CounterConfig.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/config/S2CounterConfig.scala b/s2counter_core/src/main/scala/s2/config/S2CounterConfig.scala deleted file mode 100644 index fcd0e6a..0000000 --- a/s2counter_core/src/main/scala/s2/config/S2CounterConfig.scala +++ /dev/null @@ -1,47 +0,0 @@ -package s2.config - -import com.typesafe.config.Config - -import scala.collection.JavaConversions._ - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 3. 2.. - */ -class S2CounterConfig(config: Config) extends ConfigFunctions(config) { - // HBase - lazy val HBASE_ZOOKEEPER_QUORUM = getOrElse("hbase.zookeeper.quorum", "") - lazy val HBASE_TABLE_NAME = getOrElse("hbase.table.name", "s2counter") - lazy val HBASE_TABLE_POOL_SIZE = getOrElse("hbase.table.pool.size", 100) - lazy val HBASE_CONNECTION_POOL_SIZE = getOrElse("hbase.connection.pool.size", 10) - - lazy val HBASE_CLIENT_IPC_POOL_SIZE = getOrElse("hbase.client.ipc.pool.size", 5) - lazy val HBASE_CLIENT_MAX_TOTAL_TASKS = getOrElse("hbase.client.max.total.tasks", 100) - lazy val HBASE_CLIENT_MAX_PERSERVER_TASKS = getOrElse("hbase.client.max.perserver.tasks", 5) - lazy val HBASE_CLIENT_MAX_PERREGION_TASKS = getOrElse("hbase.client.max.perregion.tasks", 1) - lazy val HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = getOrElse("hbase.client.scanner.timeout.period", 300) - lazy val HBASE_CLIENT_OPERATION_TIMEOUT = getOrElse("hbase.client.operation.timeout", 100) - lazy val HBASE_CLIENT_RETRIES_NUMBER = getOrElse("hbase.client.retries.number", 1) - - // MySQL - lazy val DB_DEFAULT_DRIVER = getOrElse("db.default.driver", "com.mysql.jdbc.Driver") - lazy val DB_DEFAULT_URL = getOrElse("db.default.url", "") - lazy val DB_DEFAULT_USER = getOrElse("db.default.user", "graph") - lazy val DB_DEFAULT_PASSWORD = getOrElse("db.default.password", "graph") - - // Redis - lazy val REDIS_INSTANCES = (for { - s <- config.getStringList("redis.instances") - } yield { - val sp = s.split(':') - (sp(0), if (sp.length > 1) sp(1).toInt else 6379) - }).toList - - // Graph - lazy val GRAPH_URL = getOrElse("s2graph.url", "http://localhost:9000") - lazy val GRAPH_READONLY_URL = getOrElse("s2graph.read-only.url", GRAPH_URL) - - // Cache - lazy val CACHE_TTL_SECONDS = getOrElse("cache.ttl.seconds", 600) - lazy val CACHE_MAX_SIZE = getOrElse("cache.max.size", 10000) - lazy val CACHE_NEGATIVE_TTL_SECONDS = getOrElse("cache.negative.ttl.seconds", CACHE_TTL_SECONDS) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/TrxLog.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/TrxLog.scala b/s2counter_core/src/main/scala/s2/counter/TrxLog.scala deleted file mode 100644 index c1db356..0000000 --- a/s2counter_core/src/main/scala/s2/counter/TrxLog.scala +++ /dev/null @@ -1,28 +0,0 @@ -package s2.counter - -import play.api.libs.json.Json - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 4. 7.. - */ -// item1 -> likedCount -> month:2015-10, 1 -// edge - // policyId = Label.findByName(likedCount).id.get - // item = edge.srcVertexId - // results = -case class TrxLog(success: Boolean, policyId: Int, item: String, results: Iterable[TrxLogResult]) - -// interval = m, ts = 2015-10, "age.gender.20.M", 1, 2 -case class TrxLogResult(interval: String, ts: Long, dimension: String, value: Long, result: Long = -1) - -object TrxLogResult { - implicit val writes = Json.writes[TrxLogResult] - implicit val reads = Json.reads[TrxLogResult] - implicit val formats = Json.format[TrxLogResult] -} - -object TrxLog { - implicit val writes = Json.writes[TrxLog] - implicit val reads = Json.reads[TrxLog] - implicit val formats = Json.format[TrxLog] -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/BytesUtil.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/BytesUtil.scala b/s2counter_core/src/main/scala/s2/counter/core/BytesUtil.scala deleted file mode 100644 index 1d945ed..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/BytesUtil.scala +++ /dev/null @@ -1,15 +0,0 @@ -package s2.counter.core - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 6. 11.. - */ -trait BytesUtil { - def getRowKeyPrefix(id: Int): Array[Byte] - - def toBytes(key: ExactKeyTrait): Array[Byte] - def toBytes(eq: ExactQualifier): Array[Byte] - def toBytes(tq: TimedQualifier): Array[Byte] - - def toExactQualifier(bytes: Array[Byte]): ExactQualifier - def toTimedQualifier(bytes: Array[Byte]): TimedQualifier -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/ExactCounter.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/ExactCounter.scala b/s2counter_core/src/main/scala/s2/counter/core/ExactCounter.scala deleted file mode 100644 index 7f36681..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/ExactCounter.scala +++ /dev/null @@ -1,247 +0,0 @@ -package s2.counter.core - -import com.typesafe.config.Config -import org.slf4j.LoggerFactory -import s2.counter.core.TimedQualifier.IntervalUnit -import s2.counter.core.TimedQualifier.IntervalUnit.IntervalUnit -import s2.counter.decay.ExpDecayFormula -import s2.counter.{TrxLog, TrxLogResult} -import s2.models.Counter -import s2.util.{CollectionCache, CollectionCacheConfig, FunctionParser} - -import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 6. 11.. - */ - -case class ExactCounterRow(key: ExactKeyTrait, value: Map[ExactQualifier, Long]) - -case class FetchedCounts(exactKey: ExactKeyTrait, qualifierWithCountMap: Map[ExactQualifier, Long]) -case class DecayedCounts(exactKey: ExactKeyTrait, qualifierWithCountMap: Map[ExactQualifier, Double]) - -case class FetchedCountsGrouped(exactKey: ExactKeyTrait, intervalWithCountMap: Map[(IntervalUnit, Map[String, String]), Map[ExactQualifier, Long]]) - -class ExactCounter(config: Config, storage: ExactStorage) { - import ExactCounter._ - - val syncDuration = Duration(10, SECONDS) - private val log = LoggerFactory.getLogger(getClass) - - val storageStatusCache = new CollectionCache[Option[Boolean]](CollectionCacheConfig(1000, 60, negativeCache = false, 60)) - - // dimension: age, value of ages - def getCountAsync(policy: Counter, - itemId: String, - intervals: Seq[IntervalUnit], - from: Long, - to: Long, - dimension: Map[String, Set[String]]) - (implicit ex: ExecutionContext): Future[Option[FetchedCountsGrouped]] = { - for { - fetchedCounts <- getCountsAsync(policy, Seq(itemId), - intervals.map(interval => (TimedQualifier(interval, from), TimedQualifier(interval, to))), dimension) - } yield { - fetchedCounts.headOption - } - } - - // multi item, time range and multi dimension - def getCountsAsync(policy: Counter, - items: Seq[String], - timeRange: Seq[(TimedQualifier, TimedQualifier)], - dimQuery: Map[String, Set[String]]) - (implicit ex: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = { - storage.get(policy, items, timeRange, dimQuery) - } - - def getCount(policy: Counter, - itemId: String, - intervals: Seq[IntervalUnit], - from: Long, - to: Long, - dimension: Map[String, Set[String]]) - (implicit ex: ExecutionContext): Option[FetchedCountsGrouped] = { - getCounts(policy, Seq(itemId), - intervals.map(interval => (TimedQualifier(interval, from), TimedQualifier(interval, to))), dimension).headOption - } - - def getCount(policy: Counter, - itemId: String, - intervals: Seq[IntervalUnit], - from: Long, - to: Long) - (implicit ex: ExecutionContext): Option[FetchedCounts] = { - val future = storage.get(policy, - Seq(itemId), - intervals.map(interval => (TimedQualifier(interval, from), TimedQualifier(interval, to)))) - Await.result(future, syncDuration).headOption - } - - // multi item, time range and multi dimension - def getCounts(policy: Counter, - items: Seq[String], - timeRange: Seq[(TimedQualifier, TimedQualifier)], - dimQuery: Map[String, Set[String]]) - (implicit ex: ExecutionContext): Seq[FetchedCountsGrouped] = { - Await.result(storage.get(policy, items, timeRange, dimQuery), syncDuration) - } - - def getRelatedCounts(policy: Counter, keyWithQualifiers: Seq[(String, Seq[ExactQualifier])]) - (implicit ex: ExecutionContext): Map[String, Map[ExactQualifier, Long]] = { - val queryKeyWithQualifiers = { - for { - (itemKey, qualifiers) <- keyWithQualifiers - } yield { - val relKey = ExactKey(policy.id, policy.version, policy.itemType, itemKey) - (relKey, qualifiers) - } - } - val future = storage.get(policy, queryKeyWithQualifiers) - - for { - FetchedCounts(exactKey, exactQualifierToLong) <- Await.result(future, syncDuration) - } yield { - exactKey.itemKey -> exactQualifierToLong - } - }.toMap - - def getPastCounts(policy: Counter, keyWithQualifiers: Seq[(String, Seq[ExactQualifier])]) - (implicit ex: ExecutionContext): Map[String, Map[ExactQualifier, Long]] = { - // query paste count - val queryKeyWithQualifiers = { - for { - (itemKey, qualifiers) <- keyWithQualifiers - } yield { - val relKey = ExactKey(policy.id, policy.version, policy.itemType, itemKey) - (relKey, qualifiers.map(eq => eq.copy(tq = eq.tq.add(-1)))) - } - } - val future = storage.get(policy, queryKeyWithQualifiers) - - for { - FetchedCounts(exactKey, exactQualifierToLong) <- Await.result(future, syncDuration) - } yield { - // restore tq - exactKey.itemKey -> exactQualifierToLong.map { case (eq, v) => - eq.copy(tq = eq.tq.add(1)) -> v - } - } - }.toMap - - def getDecayedCountsAsync(policy: Counter, - items: Seq[String], - timeRange: Seq[(TimedQualifier, TimedQualifier)], - dimQuery: Map[String, Set[String]], - qsSum: Option[String])(implicit ex: ExecutionContext): Future[Seq[DecayedCounts]] = { - val groupedTimeRange = timeRange.groupBy(_._1.q) - getCountsAsync(policy, items, timeRange, dimQuery).map { seq => - for { - FetchedCountsGrouped(k, intervalWithCountMap) <- seq - } yield { - DecayedCounts(k, { - for { - ((interval, dimKeyValues), grouped) <- intervalWithCountMap - } yield { - val (tqFrom, tqTo) = groupedTimeRange(interval).head - val formula = { - for { - strSum <- qsSum - (func, arg) <- FunctionParser(strSum) - } yield { - // apply function - func.toLowerCase match { - case "exp_decay" => ExpDecayFormula.byMeanLifeTime(arg.toLong * TimedQualifier.getTsUnit(interval)) - case _ => throw new UnsupportedOperationException(s"unknown function: $strSum") - } - } - } - ExactQualifier(tqFrom, dimKeyValues) -> { - grouped.map { case (eq, count) => - formula match { - case Some(decay) => - decay(count, tqTo.ts - eq.tq.ts) - case None => - count - } - }.sum - } - } - }) - } - } - } - - def updateCount(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Seq[TrxLog] = { - ready(policy) match { - case true => - val updateResults = storage.update(policy, counts) - for { - (exactKey, values) <- counts - results = updateResults.getOrElse(exactKey, Nil.toMap) - } yield { - TrxLog(results.nonEmpty, exactKey.policyId, exactKey.itemKey, makeTrxLogResult(values, results)) - } - case false => - Nil - } - } - - def deleteCount(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = { - storage.delete(policy, keys) - } - - private def makeTrxLogResult(values: ExactValueMap, results: ExactValueMap): Seq[TrxLogResult] = { - for { - (eq, value) <- values - } yield { - val result = results.getOrElse(eq, -1l) - TrxLogResult(eq.tq.q.toString, eq.tq.ts, eq.dimension, value, result) - } - }.toSeq - - def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] = { - storage.insertBlobValue(policy, keys) - } - - def getBlobValue(policy: Counter, blobId: String): Option[String] = { - storage.getBlobValue(policy, blobId) - } - - def prepare(policy: Counter) = { - storage.prepare(policy) - } - - def destroy(policy: Counter) = { - storage.destroy(policy) - } - - def ready(policy: Counter): Boolean = { - storageStatusCache.withCache(s"${policy.id}") { - val ready = storage.ready(policy) - if (!ready) { - // if key is not in cache, log message - log.warn(s"${policy.service}.${policy.action} storage is not ready.") - } - Some(ready) - }.getOrElse(false) - } -} - -object ExactCounter { - object ColumnFamily extends Enumeration { - type ColumnFamily = Value - - val SHORT = Value("s") - val LONG = Value("l") - } - import IntervalUnit._ - val intervalsMap = Map(MINUTELY -> ColumnFamily.SHORT, HOURLY -> ColumnFamily.SHORT, - DAILY -> ColumnFamily.LONG, MONTHLY -> ColumnFamily.LONG, TOTAL -> ColumnFamily.LONG) - - val blobCF = ColumnFamily.LONG.toString.getBytes - val blobColumn = "b".getBytes - - type ExactValueMap = Map[ExactQualifier, Long] -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/ExactKey.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/ExactKey.scala b/s2counter_core/src/main/scala/s2/counter/core/ExactKey.scala deleted file mode 100644 index 63aca51..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/ExactKey.scala +++ /dev/null @@ -1,29 +0,0 @@ -package s2.counter.core - -import s2.models.Counter -import s2.models.Counter.ItemType -import s2.models.Counter.ItemType.ItemType -import s2.util.Hashes - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 5. 27.. - */ -trait ExactKeyTrait { - def policyId: Int - def version: Byte - def itemType: ItemType - def itemKey: String -} - -case class ExactKey(policyId: Int, version: Byte, itemType: ItemType, itemKey: String) extends ExactKeyTrait -case class BlobExactKey(policyId: Int, version: Byte, itemType: ItemType, itemKey: String, itemId: String) extends ExactKeyTrait - -object ExactKey { - def apply(policy: Counter, itemId: String, checkItemType: Boolean): ExactKeyTrait = { - if (checkItemType && policy.itemType == ItemType.BLOB) { - BlobExactKey(policy.id, policy.version, ItemType.BLOB, Hashes.sha1(itemId), itemId) - } else { - ExactKey(policy.id, policy.version, policy.itemType, itemId) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/ExactQualifier.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/ExactQualifier.scala b/s2counter_core/src/main/scala/s2/counter/core/ExactQualifier.scala deleted file mode 100644 index f4ac708..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/ExactQualifier.scala +++ /dev/null @@ -1,78 +0,0 @@ -package s2.counter.core - -import java.util - -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import s2.counter.core.TimedQualifier.IntervalUnit.IntervalUnit - -import scala.collection.JavaConversions._ - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 5. 27.. - */ -case class ExactQualifier(tq: TimedQualifier, dimKeyValues: Map[String, String], dimension: String) { - def checkDimensionEquality(dimQuery: Map[String, Set[String]]): Boolean = { -// println(s"self: $dimKeyValues, query: $dimQuery") - dimQuery.size == dimKeyValues.size && { - for { - (k, v) <- dimKeyValues - } yield { - dimQuery.get(k).exists(qv => qv.isEmpty || qv.contains(v)) - } - }.forall(x => x) - } -} - -object ExactQualifier { - val cache: LoadingCache[String, Map[String, String]] = CacheBuilder.newBuilder() - .maximumSize(10000) - .build( - new CacheLoader[String, Map[String, String]]() { - def load(s: String): Map[String, String] = { - strToDimensionMap(s) - } - } - ) - - def apply(tq: TimedQualifier, dimension: String): ExactQualifier = { - ExactQualifier(tq, cache.get(dimension), dimension) - } - - def apply(tq: TimedQualifier, dimKeyValues: Map[String, String]): ExactQualifier = { - ExactQualifier(tq, dimKeyValues, makeDimensionStr(dimKeyValues)) - } - - def makeSortedDimension(dimKeyValues: Map[String, String]): Iterator[String] = { - val sortedDimKeyValues = new util.TreeMap[String, String](dimKeyValues) - sortedDimKeyValues.keysIterator ++ sortedDimKeyValues.valuesIterator - } - - def makeDimensionStr(dimKeyValues: Map[String, String]): String = { - makeSortedDimension(dimKeyValues).mkString(".") - } - - def getQualifiers(intervals: Seq[IntervalUnit], ts: Long, dimKeyValues: Map[String, String]): Seq[ExactQualifier] = { - for { - tq <- TimedQualifier.getQualifiers(intervals, ts) - } yield { - ExactQualifier(tq, dimKeyValues, makeDimensionStr(dimKeyValues)) - } - } - - def strToDimensionMap(dimension: String): Map[String, String] = { - val dimSp = { - val sp = dimension.split('.') - if (dimension == ".") { - Array("", "") - } - else if (dimension.nonEmpty && dimension.last == '.') { - sp ++ Array("") - } else { - sp - } - } - val dimKey = dimSp.take(dimSp.length / 2) - val dimVal = dimSp.takeRight(dimSp.length / 2) - dimKey.zip(dimVal).toMap - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/ExactStorage.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/ExactStorage.scala b/s2counter_core/src/main/scala/s2/counter/core/ExactStorage.scala deleted file mode 100644 index 6a81f41..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/ExactStorage.scala +++ /dev/null @@ -1,35 +0,0 @@ -package s2.counter.core - -import s2.counter.core.ExactCounter.ExactValueMap -import s2.models.Counter - -import scala.concurrent.{ExecutionContext, Future} - -/** - * Created by shon on 8/12/15. - */ -trait ExactStorage { - // for range query and check dimension - def get(policy: Counter, - items: Seq[String], - timeRange: Seq[(TimedQualifier, TimedQualifier)], - dimQuery: Map[String, Set[String]]) - (implicit ec: ExecutionContext): Future[Seq[FetchedCountsGrouped]] - // for range query - def get(policy: Counter, - items: Seq[String], - timeRange: Seq[(TimedQualifier, TimedQualifier)]) - (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] - // for query exact qualifier - def get(policy: Counter, - queries: Seq[(ExactKeyTrait, Seq[ExactQualifier])]) - (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] - def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] - def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit - def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] - def getBlobValue(policy: Counter, blobId: String): Option[String] - - def prepare(policy: Counter): Unit - def destroy(policy: Counter): Unit - def ready(policy: Counter): Boolean -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RankingCounter.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/RankingCounter.scala b/s2counter_core/src/main/scala/s2/counter/core/RankingCounter.scala deleted file mode 100644 index b98ef30..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/RankingCounter.scala +++ /dev/null @@ -1,105 +0,0 @@ -package s2.counter.core - -import java.util.concurrent.TimeUnit - -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import com.typesafe.config.Config -import org.slf4j.LoggerFactory -import s2.counter.core.RankingCounter.RankingValueMap -import s2.models.Counter -import s2.util.{CollectionCache, CollectionCacheConfig} - -import scala.collection.JavaConversions._ - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 6. 19.. - */ -case class RankingRow(key: RankingKey, value: Map[String, RankingValue]) -case class RateRankingRow(key: RankingKey, value: Map[String, RateRankingValue]) - -class RankingCounter(config: Config, storage: RankingStorage) { - private val log = LoggerFactory.getLogger(getClass) - - val storageStatusCache = new CollectionCache[Option[Boolean]](CollectionCacheConfig(1000, 60, negativeCache = false, 60)) - - val cache: LoadingCache[RankingKey, RankingResult] = CacheBuilder.newBuilder() - .maximumSize(1000000) - .expireAfterWrite(10l, TimeUnit.MINUTES) - .build( - new CacheLoader[RankingKey, RankingResult]() { - def load(rankingKey: RankingKey): RankingResult = { -// log.warn(s"cache load: $rankingKey") - storage.getTopK(rankingKey, Int.MaxValue).getOrElse(RankingResult(-1, Nil)) - } - } - ) - - def getTopK(rankingKey: RankingKey, k: Int = Int.MaxValue): Option[RankingResult] = { - val tq = rankingKey.eq.tq - if (TimedQualifier.getQualifiers(Seq(tq.q), System.currentTimeMillis()).head == tq) { - // do not use cache - storage.getTopK(rankingKey, k) - } - else { - val result = cache.get(rankingKey) - if (result.values.nonEmpty) { - Some(result.copy(values = result.values.take(k))) - } - else { - None - } - } - } - - def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = { - storage.update(key, value, k) - } - - def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit = { - storage.update(values, k) - } - - def delete(key: RankingKey): Unit = { - storage.delete(key) - } - - def getAllItems(keys: Seq[RankingKey], k: Int = Int.MaxValue): Seq[String] = { - val oldKeys = keys.filter(key => TimedQualifier.getQualifiers(Seq(key.eq.tq.q), System.currentTimeMillis()).head != key.eq.tq) - val cached = cache.getAllPresent(oldKeys) - val missed = keys.diff(cached.keys.toSeq) - val found = storage.getTopK(missed, k) - -// log.warn(s"cached: ${cached.size()}, missed: ${missed.size}") - - for { - (key, result) <- found - } { - cache.put(key, result) - } - - for { - (key, RankingResult(totalScore, values)) <- cached ++ found - (item, score) <- values - } yield { - item - } - }.toSeq.distinct - - def prepare(policy: Counter): Unit = { - storage.prepare(policy) - } - - def destroy(policy: Counter): Unit = { - storage.destroy(policy) - } - - def ready(policy: Counter): Boolean = { - storageStatusCache.withCache(s"${policy.id}") { - Some(storage.ready(policy)) - }.getOrElse(false) - } -} - -object RankingCounter { - type RankingValueMap = Map[String, RankingValue] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RankingKey.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/RankingKey.scala b/s2counter_core/src/main/scala/s2/counter/core/RankingKey.scala deleted file mode 100644 index 7d70625..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/RankingKey.scala +++ /dev/null @@ -1,6 +0,0 @@ -package s2.counter.core - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 6. 19.. - */ -case class RankingKey(policyId: Int, version: Byte, eq: ExactQualifier) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RankingResult.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/RankingResult.scala b/s2counter_core/src/main/scala/s2/counter/core/RankingResult.scala deleted file mode 100644 index 42fae3e..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/RankingResult.scala +++ /dev/null @@ -1,6 +0,0 @@ -package s2.counter.core - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 6. 19.. - */ -case class RankingResult(totalScore: Double, values: Seq[(String, Double)]) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RankingStorage.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/RankingStorage.scala b/s2counter_core/src/main/scala/s2/counter/core/RankingStorage.scala deleted file mode 100644 index b643bd8..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/RankingStorage.scala +++ /dev/null @@ -1,19 +0,0 @@ -package s2.counter.core - -import s2.counter.core.RankingCounter.RankingValueMap -import s2.models.Counter - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 6. 22.. - */ -trait RankingStorage { - def getTopK(key: RankingKey, k: Int): Option[RankingResult] - def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, RankingResult)] - def update(key: RankingKey, value: RankingValueMap, k: Int): Unit - def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit - def delete(key: RankingKey) - - def prepare(policy: Counter): Unit - def destroy(policy: Counter): Unit - def ready(policy: Counter): Boolean -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RankingValue.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/RankingValue.scala b/s2counter_core/src/main/scala/s2/counter/core/RankingValue.scala deleted file mode 100644 index 4b67633..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/RankingValue.scala +++ /dev/null @@ -1,19 +0,0 @@ -package s2.counter.core - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 6. 22.. - */ - -/** - * ranking score and increment value - * @param score ranking score - * @param increment increment value for v1 - */ -case class RankingValue(score: Double, increment: Double) - -object RankingValue { - def reduce(r1: RankingValue, r2: RankingValue): RankingValue = { - // maximum score and sum of increment - RankingValue(math.max(r1.score, r2.score), r1.increment + r2.increment) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RateRankingValue.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/RateRankingValue.scala b/s2counter_core/src/main/scala/s2/counter/core/RateRankingValue.scala deleted file mode 100644 index eca3abc..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/RateRankingValue.scala +++ /dev/null @@ -1,18 +0,0 @@ -package s2.counter.core - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 7. 2.. - */ -case class RateRankingValue(actionScore: Double, baseScore: Double) { - // increment score do not use. - lazy val rankingValue: RankingValue = { - RankingValue(actionScore / math.max(1d, baseScore), 0) - } -} - -object RateRankingValue { - def reduce(r1: RateRankingValue, r2: RateRankingValue): RateRankingValue = { - // maximum score and sum of increment - RateRankingValue(math.max(r1.actionScore, r2.actionScore), math.max(r1.baseScore, r2.baseScore)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/TimedQualifier.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/TimedQualifier.scala b/s2counter_core/src/main/scala/s2/counter/core/TimedQualifier.scala deleted file mode 100644 index b763bc2..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/TimedQualifier.scala +++ /dev/null @@ -1,218 +0,0 @@ -package s2.counter.core - -import java.text.SimpleDateFormat -import java.util.Calendar - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 6. 8.. - */ -case class TimedQualifier(q: TimedQualifier.IntervalUnit.Value, ts: Long) { - import TimedQualifier.IntervalUnit._ - - def dateTime: Long = { - val dateFormat = new SimpleDateFormat("yyyyMMddHHmm") - dateFormat.format(ts).toLong - } - - def add(amount: Int): TimedQualifier = { - val cal = Calendar.getInstance() - cal.setTimeInMillis(ts) - q match { - case MINUTELY => - cal.add(Calendar.MINUTE, amount) - case HOURLY => - cal.add(Calendar.HOUR, amount) - case DAILY => - cal.add(Calendar.DAY_OF_MONTH, amount) - case MONTHLY => - cal.add(Calendar.MONTH, amount) - case TOTAL => - } - copy(ts = cal.getTimeInMillis) - } -} - -object TimedQualifier { - object IntervalUnit extends Enumeration { - type IntervalUnit = Value - val TOTAL = Value("t") - val MONTHLY = Value("M") - val DAILY = Value("d") - val HOURLY = Value("H") - val MINUTELY = Value("m") - } - - def apply(q: String, ts: Long): TimedQualifier = TimedQualifier(IntervalUnit.withName(q), ts) - - import IntervalUnit._ - - def getTsUnit(intervalUnit: IntervalUnit.IntervalUnit): Long = { - intervalUnit match { - case MINUTELY => 1 * 60 * 1000l - case HOURLY => 60 * 60 * 1000l - case DAILY => 24 * 60 * 60 * 1000l - case MONTHLY => 31 * 24 * 60 * 60 * 1000l - case v: IntervalUnit.IntervalUnit => - throw new RuntimeException(s"unsupported operation for ${v.toString}") - } - } - - def getQualifiers(intervals: Seq[IntervalUnit], millis: Long): Seq[TimedQualifier] = { - val cal = Calendar.getInstance() - cal.setTimeInMillis(millis) - - val newCal = Calendar.getInstance() - newCal.set(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH), 1, 0, 0, 0) - newCal.set(Calendar.MILLISECOND, 0) - val month = newCal.getTimeInMillis - val Seq(day, hour, minute) = { - for { - field <- Seq(Calendar.DATE, Calendar.HOUR_OF_DAY, Calendar.MINUTE) - } yield { - newCal.set(field, cal.get(field)) - newCal.getTimeInMillis - } - } - - for { - interval <- intervals - } yield { - val ts = interval match { - case MINUTELY => minute - case HOURLY => hour - case DAILY => day - case MONTHLY => month - case TOTAL => 0L - } - TimedQualifier(interval, ts) - } - } - - // descending order - def getQualifiersToLimit(intervals: Seq[IntervalUnit], limit: Int, tsOpt: Option[Long] = None): Seq[TimedQualifier] = { - val ts = tsOpt.getOrElse(System.currentTimeMillis()) - for { - interval <- intervals - newLimit = if (interval == TOTAL) 1 else limit - i <- 0 until (-newLimit, -1) - } yield { - val newMillis = nextTime(interval, ts, i) - TimedQualifier(interval, newMillis) - } - } - - private def nextTime(interval: IntervalUnit, ts: Long, i: Int): Long = { - val newCal = Calendar.getInstance() - newCal.setTimeInMillis(ts) - newCal.set(Calendar.MILLISECOND, 0) - interval match { - case MINUTELY => - newCal.set(Calendar.SECOND, 0) - newCal.add(Calendar.MINUTE, i) - newCal.getTimeInMillis - case HOURLY => - newCal.set(Calendar.SECOND, 0) - newCal.set(Calendar.MINUTE, 0) - newCal.add(Calendar.HOUR_OF_DAY, i) - newCal.getTimeInMillis - case DAILY => - newCal.set(Calendar.SECOND, 0) - newCal.set(Calendar.MINUTE, 0) - newCal.set(Calendar.HOUR_OF_DAY, 0) - newCal.add(Calendar.DAY_OF_MONTH, i) - newCal.getTimeInMillis - case MONTHLY => - newCal.set(Calendar.SECOND, 0) - newCal.set(Calendar.MINUTE, 0) - newCal.set(Calendar.HOUR_OF_DAY, 0) - newCal.set(Calendar.DAY_OF_MONTH, 1) - newCal.add(Calendar.MONTH, i) - newCal.getTimeInMillis - case TOTAL => - 0L - } - } - - def getTimeList(interval: IntervalUnit, from: Long, to: Long, rst: List[Long] = Nil): List[Long] = { - interval match { - case TOTAL => List(0) - case _ => - val next = nextTime(interval, from, 1) - if (next < from) { - // ignore - getTimeList(interval, next, to, rst) - } - else if (next < to) { - // recall - getTimeList(interval, next, to, rst :+ next) - } else { - // end condition - rst :+ next - } - } - } - - // for reader - def getQualifiersToLimit(intervals: Seq[IntervalUnit], - limit: Int, - optFrom: Option[Long], - optTo: Option[Long]): Seq[List[TimedQualifier]] = { - val newLimit = limit - 1 - for { - interval <- intervals - } yield { - { - (optFrom, optTo) match { - case (Some(from), Some(to)) => - getTimeList(interval, from, to) - case (Some(from), None) => - getTimeList(interval, from, nextTime(interval, from, newLimit)) - case (None, Some(to)) => - getTimeList(interval, nextTime(interval, to, -newLimit), to) - case (None, None) => - val current = System.currentTimeMillis() - getTimeList(interval, nextTime(interval, current, -newLimit), current) - } - }.map { ts => - TimedQualifier(interval, ts) - } - } - } - - def getTimeRange(intervals: Seq[IntervalUnit], - limit: Int, - optFrom: Option[Long], - optTo: Option[Long]): Seq[(TimedQualifier, TimedQualifier)] = { - val newLimit = limit - 1 - val maxInterval = intervals.maxBy { - case MINUTELY => 0 - case HOURLY => 1 - case DAILY => 2 - case MONTHLY => 3 - case TOTAL => 4 - } - val minInterval = intervals.minBy { - case MINUTELY => 0 - case HOURLY => 1 - case DAILY => 2 - case MONTHLY => 3 - case TOTAL => 4 - } - val (from, to) = (optFrom, optTo) match { - case (Some(f), Some(t)) => - (f, t) - case (Some(f), None) => - (f, nextTime(minInterval, f, newLimit)) - case (None, Some(t)) => - (nextTime(maxInterval, t, -newLimit), t) - case (None, None) => - val current = System.currentTimeMillis() - (nextTime(maxInterval, current, -newLimit), nextTime(minInterval, current, 0)) - } - for { - interval <- intervals - } yield { - (TimedQualifier(interval, from), TimedQualifier(interval, to)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/v1/BytesUtilV1.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/v1/BytesUtilV1.scala b/s2counter_core/src/main/scala/s2/counter/core/v1/BytesUtilV1.scala deleted file mode 100644 index 1b70f63..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/v1/BytesUtilV1.scala +++ /dev/null @@ -1,64 +0,0 @@ -package s2.counter.core.v1 - -import org.apache.hadoop.hbase.util.Bytes -import s2.counter.core.TimedQualifier.IntervalUnit -import s2.counter.core._ -import s2.models.Counter.ItemType -import s2.util.Hashes - -import scala.collection.mutable.ArrayBuffer - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 6. 11.. - */ -object BytesUtilV1 extends BytesUtil { - // ExactKey: [hash(2b)][policy(4b)][item(variable)] - val BUCKET_BYTE_SIZE = Bytes.SIZEOF_SHORT - val POLICY_ID_SIZE = Bytes.SIZEOF_INT - val INTERVAL_SIZE = Bytes.SIZEOF_BYTE - val TIMESTAMP_SIZE = Bytes.SIZEOF_LONG - val TIMED_QUALIFIER_SIZE = INTERVAL_SIZE + TIMESTAMP_SIZE - - override def getRowKeyPrefix(id: Int): Array[Byte] = { - Bytes.toBytes(id) - } - - override def toBytes(key: ExactKeyTrait): Array[Byte] = { - val buff = new ArrayBuffer[Byte] - // hash key (2 byte) - buff ++= Bytes.toBytes(Hashes.murmur3(key.itemKey)).take(BUCKET_BYTE_SIZE) - - buff ++= getRowKeyPrefix(key.policyId) - buff ++= { - key.itemType match { - case ItemType.INT => Bytes.toBytes(key.itemKey.toInt) - case ItemType.LONG => Bytes.toBytes(key.itemKey.toLong) - case ItemType.STRING | ItemType.BLOB => Bytes.toBytes(key.itemKey) - } - } - buff.toArray - } - - override def toBytes(eq: ExactQualifier): Array[Byte] = { - toBytes(eq.tq) ++ eq.dimension.getBytes - } - - override def toBytes(tq: TimedQualifier): Array[Byte] = { - Bytes.toBytes(tq.q.toString) ++ Bytes.toBytes(tq.ts) - } - - override def toExactQualifier(bytes: Array[Byte]): ExactQualifier = { - // qualifier: interval, ts, dimension ìì - val tq = toTimedQualifier(bytes) - - val dimension = Bytes.toString(bytes, TIMED_QUALIFIER_SIZE, bytes.length - TIMED_QUALIFIER_SIZE) - ExactQualifier(tq, dimension) - } - - override def toTimedQualifier(bytes: Array[Byte]): TimedQualifier = { - val interval = Bytes.toString(bytes, 0, INTERVAL_SIZE) - val ts = Bytes.toLong(bytes, INTERVAL_SIZE) - - TimedQualifier(IntervalUnit.withName(interval), ts) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageAsyncHBase.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageAsyncHBase.scala b/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageAsyncHBase.scala deleted file mode 100644 index 6aae3cd..0000000 --- a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageAsyncHBase.scala +++ /dev/null @@ -1,321 +0,0 @@ -package s2.counter.core.v1 - -import java.util - -import com.stumbleupon.async.{Callback, Deferred} -import com.typesafe.config.Config -import org.apache.hadoop.hbase.CellUtil -import org.apache.hadoop.hbase.client._ -import org.apache.hadoop.hbase.util.Bytes -import org.hbase.async.{ColumnRangeFilter, FilterList, GetRequest, KeyValue} -import org.slf4j.LoggerFactory -import s2.config.S2CounterConfig -import s2.counter.core.ExactCounter.ExactValueMap -import s2.counter.core._ -import s2.helper.{Management, WithAsyncHBase, WithHBase} -import s2.models.Counter -import s2.models.Counter.ItemType - -import scala.collection.JavaConversions._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - -/** - * Created by hsleep(honeysl...@gmail.com) on 15. 8. 19.. - */ -class ExactStorageAsyncHBase(config: Config) extends ExactStorage { - import ExactStorageHBase._ - - private val log = LoggerFactory.getLogger(getClass) - - lazy val s2config = new S2CounterConfig(config) - - private[counter] val withHBase = new WithHBase(config) - private[counter] val withAsyncHBase = new WithAsyncHBase(config) - private[counter] val hbaseManagement = new Management(config) - - private def getTableName(policy: Counter): String = { - policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME) - } - - override def get(policy: Counter, - items: Seq[String], - timeRange: Seq[(TimedQualifier, TimedQualifier)]) - (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = { - - val tableName = getTableName(policy) - - lazy val messageForLog = s"${policy.service}.${policy.action} $items $timeRange" - - val keys = { - for { - item <- items - } yield { - ExactKey(policy, item, checkItemType = true) - } - } - - val gets = { - for { - cf <- timeRange.map(t => intervalsMap(t._1.q)).distinct - key <- keys - } yield { - val get = new GetRequest(tableName, BytesUtilV1.toBytes(key)) - get.family(cf.toString) - get.setFilter(new FilterList({ - for { - (from, to) <- timeRange - } yield { - new ColumnRangeFilter( - BytesUtilV1.toBytes(from), true, - BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false) - } - }, FilterList.Operator.MUST_PASS_ONE)) - (key, cf, get) - } - } - -// println(s"$messageForLog $gets") - - withAsyncHBase[Seq[FetchedCounts]] { client => - val deferreds: Seq[Deferred[FetchedCounts]] = { - for { - (key, cf, get) <- gets - } yield { - client.get(get).addCallback { new Callback[FetchedCounts, util.ArrayList[KeyValue]] { - override def call(kvs: util.ArrayList[KeyValue]): FetchedCounts = { - val qualifierWithCounts = { - for { - kv <- kvs - eq = BytesUtilV1.toExactQualifier(kv.qualifier()) - } yield { - eq -> Bytes.toLong(kv.value()) - } - }.toMap -// println(s"$key $qualifierWithCounts") - FetchedCounts(key, qualifierWithCounts) - } - }} - } - } - Deferred.group(deferreds).addCallback { new Callback[Seq[FetchedCounts], util.ArrayList[FetchedCounts]] { - override def call(arg: util.ArrayList[FetchedCounts]): Seq[FetchedCounts] = { - for { - (key, fetchedGroup) <- Seq(arg: _*).groupBy(_.exactKey) - } yield { - fetchedGroup.reduce[FetchedCounts] { case (f1, f2) => - FetchedCounts(key, f1.qualifierWithCountMap ++ f2.qualifierWithCountMap) - } - } - }.toSeq - }} - } - } - - override def get(policy: Counter, - items: Seq[String], - timeRange: Seq[(TimedQualifier, TimedQualifier)], - dimQuery: Map[String, Set[String]]) - (implicit ec: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = { - get(policy, items, timeRange).map { fetchedLs => - for { - FetchedCounts(exactKey, qualifierWithCountMap) <- fetchedLs - } yield { - val intervalWithCountMap = qualifierWithCountMap - .filter { case (eq, v) => eq.checkDimensionEquality(dimQuery) } - .groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) } - FetchedCountsGrouped(exactKey, intervalWithCountMap) - } - } - } - - override def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = { - // increment mutation to hbase - val increments = { - for { - (exactKey, values) <- counts - inc = new Increment(BytesUtilV1.toBytes(exactKey)) - } yield { - for { - (eq, value) <- values - } { - inc.addColumn(intervalsMap.apply(eq.tq.q).toString.getBytes, BytesUtilV1.toBytes(eq), value) - } - // add column by dimension - inc - } - } - - val results: Array[Object] = Array.fill(increments.size)(null) - - withHBase(getTableName(policy)) { table => - table.batch(increments, results) - } match { - case Failure(ex) => - log.error(s"${ex.getMessage}") - case _ => - } - - assert(counts.length == results.length) - - for { - ((exactKey, eqWithValue), result) <- counts.zip(results) - } yield { - val eqWithResult = result match { - case r: Result => - for { - (eq, value) <- eqWithValue - } yield { - val interval = eq.tq.q - val cf = intervalsMap(interval) - val result = Option(r.getColumnLatestCell(cf.toString.getBytes, BytesUtilV1.toBytes(eq))).map { cell => - Bytes.toLong(CellUtil.cloneValue(cell)) - }.getOrElse(-1l) - eq -> result - } - case ex: Throwable => - log.error(s"${ex.getMessage}: $exactKey") - Nil - case _ => - log.error(s"result is null: $exactKey") - Nil - } - (exactKey, eqWithResult.toMap) - } - }.toMap - - override def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = { - withHBase(getTableName(policy)) { table => - table.delete { - for { - key <- keys - } yield { - new Delete(BytesUtilV1.toBytes(key)) - } - } - } match { - case Failure(ex) => - log.error(ex.getMessage) - case _ => - } - } - - override def get(policy: Counter, - queries: Seq[(ExactKeyTrait, Seq[ExactQualifier])]) - (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = { - - val tableName = getTableName(policy) - - val gets = { - for { - (key, eqs) <- queries - (cf, eqsGrouped) <- eqs.groupBy(eq => intervalsMap(eq.tq.q)) - } yield { -// println(s"$key $eqsGrouped") - val get = new GetRequest(tableName, BytesUtilV1.toBytes(key)) - get.family(cf.toString) - get.qualifiers(eqsGrouped.map(BytesUtilV1.toBytes).toArray) - (key, cf, get) - } - } - - withAsyncHBase[Seq[FetchedCounts]] { client => - val deferreds: Seq[Deferred[FetchedCounts]] = { - for { - (key, cf, get) <- gets - } yield { - client.get(get).addCallback { new Callback[FetchedCounts, util.ArrayList[KeyValue]] { - override def call(kvs: util.ArrayList[KeyValue]): FetchedCounts = { - val qualifierWithCounts = { - for { - kv <- kvs - eq = BytesUtilV1.toExactQualifier(kv.qualifier()) - } yield { - eq -> Bytes.toLong(kv.value()) - } - }.toMap - FetchedCounts(key, qualifierWithCounts) - } - }} - } - } - Deferred.group(deferreds).addCallback { new Callback[Seq[FetchedCounts], util.ArrayList[FetchedCounts]] { - override def call(arg: util.ArrayList[FetchedCounts]): Seq[FetchedCounts] = arg - }} - } - } - - override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] = { - val results: Array[Object] = Array.fill(keys.size)(null) - - val puts = keys.map { key => - val put = new Put(BytesUtilV1.toBytes(key)) - put.addColumn(blobCF, blobColumn, key.itemId.getBytes) - } - - withHBase(getTableName(policy)) { table => - table.batch(puts, results) - } match { - case Failure(ex) => - log.error(s"${ex.getMessage}") - case _ => - } - - for { - (result, key) <- results.zip(keys) - } yield { - Option(result).map(_ => true).getOrElse { - log.error(s"fail to insert blob value: $key") - false - } - } - } - - override def getBlobValue(policy: Counter, blobId: String): Option[String] = { - lazy val messageForLog = s"${policy.service}.${policy.action}.$blobId" - - policy.itemType match { - case ItemType.BLOB => - withHBase(getTableName(policy)) { table => - val rowKey = BytesUtilV1.toBytes(ExactKey(policy.id, policy.version, policy.itemType, blobId)) - val get = new Get(rowKey) - get.addColumn(blobCF, blobColumn) - table.get(get) - } match { - case Success(result) => - Option(result).filter(!_.isEmpty).map { rst => - Bytes.toString(rst.getValue(blobCF, blobColumn)) - } - case Failure(ex) => - throw ex - } - case _ => - log.warn(s"is not blob type counter. $messageForLog") - throw new Exception(s"is not blob type counter. $messageForLog") - } - } - - override def prepare(policy: Counter): Unit = { - // create hbase table - policy.hbaseTable.foreach { table => - if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)) { - hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table, - ColumnFamily.values.map(_.toString).toList, 1) - hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, ColumnFamily.SHORT.toString, policy.ttl) - policy.dailyTtl.foreach { i => - hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, ColumnFamily.LONG.toString, i * 24 * 60 * 60) - } - } - } - } - - override def destroy(policy: Counter): Unit = { - - } - - override def ready(policy: Counter): Boolean = { - policy.hbaseTable.map { table => - hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table) - }.getOrElse(true) - } -}