http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/Management.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/Management.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/Management.scala
new file mode 100644
index 0000000..c507075
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/Management.scala
@@ -0,0 +1,143 @@
+package org.apache.s2graph.counter.helper
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability}
+import org.apache.hadoop.hbase.io.compress.Compression
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.regionserver.BloomType
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
+import org.slf4j.LoggerFactory
+import redis.clients.jedis.ScanParams
+
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+class Management(config: Config) {
+   val withRedis = new HashShardingJedis(config)
+
+   val log = LoggerFactory.getLogger(this.getClass)
+
+   def describe(zkAddr: String, tableName: String) = {
+     val admin = getAdmin(zkAddr)
+     val table = admin.getTableDescriptor(TableName.valueOf(tableName))
+
+     table.getColumnFamilies.foreach { cf =>
+       println(s"columnFamily: ${cf.getNameAsString}")
+       cf.getValues.foreach { case (k, v) =>
+         println(s"${Bytes.toString(k.get())} ${Bytes.toString(v.get())}")
+       }
+     }
+   }
+
+   def setTTL(zkAddr: String, tableName: String, cfName: String, ttl: Int) = {
+     val admin = getAdmin(zkAddr)
+     val tableNameObj = TableName.valueOf(tableName)
+     val table = admin.getTableDescriptor(tableNameObj)
+
+     val cf = table.getFamily(cfName.getBytes)
+     cf.setTimeToLive(ttl)
+
+     admin.modifyColumn(tableNameObj, cf)
+   }
+
+   def getAdmin(zkAddr: String): Admin = {
+     val conf = HBaseConfiguration.create()
+     conf.set("hbase.zookeeper.quorum", zkAddr)
+     val conn = ConnectionFactory.createConnection(conf)
+     conn.getAdmin
+   }
+
+   def tableExists(zkAddr: String, tableName: String): Boolean = {
+     getAdmin(zkAddr).tableExists(TableName.valueOf(tableName))
+   }
+
+   def createTable(zkAddr: String, tableName: String, cfs: List[String], 
regionMultiplier: Int) = {
+     log.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier")
+     val admin = getAdmin(zkAddr)
+     val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
+     try {
+       val desc = new HTableDescriptor(TableName.valueOf(tableName))
+       desc.setDurability(Durability.ASYNC_WAL)
+       for (cf <- cfs) {
+         val columnDesc = new HColumnDescriptor(cf)
+           .setCompressionType(Compression.Algorithm.LZ4)
+           .setBloomFilterType(BloomType.ROW)
+           .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+           .setMaxVersions(1)
+           .setMinVersions(0)
+           .setBlocksize(32768)
+           .setBlockCacheEnabled(true)
+         desc.addFamily(columnDesc)
+       }
+
+       if (regionCount <= 1) admin.createTable(desc)
+       else admin.createTable(desc, getStartKey(regionCount), 
getEndKey(regionCount), regionCount)
+     } catch {
+       case e: Throwable =>
+         log.error(s"$zkAddr, $tableName failed with $e", e)
+         throw e
+     }
+   }
+
+   // we only use murmur hash to distribute row key.
+   private def getStartKey(regionCount: Int) = {
+     Bytes.toBytes(Int.MaxValue / regionCount)
+   }
+
+   private def getEndKey(regionCount: Int) = {
+     Bytes.toBytes(Int.MaxValue / regionCount * (regionCount - 1))
+   }
+
+   case class RedisScanIterator(scanParams: ScanParams = new 
ScanParams().count(100)) extends Iterator[String] {
+     val nextCursorId: collection.mutable.Map[Int, String] = 
collection.mutable.Map.empty[Int, String]
+     var innerIterator: Iterator[String] = _
+
+     for {
+       i <- 0 until withRedis.jedisPoolSize
+     } {
+       nextCursorId.put(i, "0")
+     }
+
+     def callScan(): Unit = {
+       if (nextCursorId.nonEmpty) {
+ //        println(s"callScan: idx: $nextIdx, cursor: $nextCursorId")
+         val idx = Random.shuffle(nextCursorId.keys).head
+         val cursorId = nextCursorId(idx)
+         val pool = withRedis.getJedisPool(idx)
+         val conn = pool.getResource
+         try {
+           val result = conn.scan(cursorId, scanParams)
+           result.getStringCursor match {
+             case "0" =>
+               log.debug(s"end scan: idx: $idx, cursor: $cursorId")
+               nextCursorId.remove(idx)
+             case x: String =>
+               nextCursorId.put(idx, x)
+           }
+           innerIterator = result.getResult.toIterator
+         } finally {
+           pool.returnResource(conn)
+         }
+       }
+       else {
+         innerIterator = List.empty[String].toIterator
+       }
+     }
+
+     // initialize
+     callScan()
+
+     override def hasNext: Boolean = {
+       innerIterator.hasNext match {
+         case true =>
+           true
+         case false =>
+           callScan()
+           innerIterator.hasNext
+       }
+     }
+
+     override def next(): String = innerIterator.next()
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala
new file mode 100644
index 0000000..108c47a
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithHBase.scala
@@ -0,0 +1,98 @@
+package org.apache.s2graph.counter.helper
+
+import com.stumbleupon.async.{Callback, Deferred}
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.hbase.async.HBaseClient
+import org.slf4j.LoggerFactory
+import scala.concurrent.{Future, Promise}
+import scala.util.Try
+
+
+class WithHBase(config: Config) {
+  lazy val logger = LoggerFactory.getLogger(this.getClass)
+  lazy val s2config = new S2CounterConfig(config)
+
+  lazy val zkQuorum = s2config.HBASE_ZOOKEEPER_QUORUM
+  lazy val defaultTableName = s2config.HBASE_TABLE_NAME
+
+  logger.info(s"$zkQuorum, $defaultTableName")
+
+  val hbaseConfig = HBaseConfiguration.create()
+  s2config.getConfigMap("hbase").foreach { case (k, v) =>
+    hbaseConfig.set(k, v)
+  }
+
+//  lazy val conn: HConnection = 
HConnectionManager.createConnection(hbaseConfig)
+  lazy val conn: Connection = ConnectionFactory.createConnection(hbaseConfig)
+
+  val writeBufferSize = 1024 * 1024 * 2   // 2MB
+
+//  def apply[T](op: Table => T): Try[T] = {
+//    Try {
+//      val table = conn.getTable(TableName.valueOf(defaultTableName))
+//      // do not keep failed operation in writer buffer
+//      table.setWriteBufferSize(writeBufferSize)
+//      try {
+//        op(table)
+//      } catch {
+//        case e: Throwable =>
+//          logger.error(s"Operation to table($defaultTableName) is failed: 
${e.getMessage}")
+//          throw e
+//      } finally {
+//        table.close()
+//      }
+//    }
+//  }
+  
+  def apply[T](tableName: String)(op: Table => T): Try[T] = {
+    Try {
+      val table = conn.getTable(TableName.valueOf(tableName))
+      // do not keep failed operation in writer buffer
+      table.setWriteBufferSize(writeBufferSize)
+      try {
+        op(table)
+      } catch {
+        case ex: Exception =>
+          logger.error(s"$ex: Operation to table($tableName) is failed")
+          throw ex
+      } finally {
+        table.close()
+      }
+    }
+  }
+}
+
+case class WithAsyncHBase(config: Config) {
+  lazy val logger = LoggerFactory.getLogger(this.getClass)
+  lazy val s2config = new S2CounterConfig(config)
+
+  lazy val zkQuorum = s2config.HBASE_ZOOKEEPER_QUORUM
+
+  val hbaseConfig = HBaseConfiguration.create()
+  s2config.getConfigMap("hbase").foreach { case (k, v) =>
+    hbaseConfig.set(k, v)
+  }
+
+//  lazy val conn: HConnection = 
HConnectionManager.createConnection(hbaseConfig)
+  lazy val client: HBaseClient = new HBaseClient(zkQuorum)
+
+  val writeBufferSize = 1024 * 1024 * 2   // 2MB
+
+  def apply[T](op: HBaseClient => Deferred[T]): Future[T] = {
+    val promise = Promise[T]()
+
+    op(client).addCallback(new Callback[Unit, T] {
+      def call(arg: T): Unit = {
+        promise.success(arg)
+      }
+    }).addErrback(new Callback[Unit, Exception] {
+      def call(ex: Exception): Unit = {
+        promise.failure(ex)
+      }
+    })
+    promise.future
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithRedis.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithRedis.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithRedis.scala
new file mode 100644
index 0000000..a7b99c8
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/WithRedis.scala
@@ -0,0 +1,59 @@
+package org.apache.s2graph.counter.helper
+
+import com.typesafe.config.Config
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.util.Hashes
+import org.slf4j.LoggerFactory
+import redis.clients.jedis.exceptions.JedisException
+import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
+
+import scala.util.Try
+
+class WithRedis(config: Config) {
+   lazy val s2config = new S2CounterConfig(config)
+
+   private val log = LoggerFactory.getLogger(getClass)
+
+   val poolConfig = new JedisPoolConfig()
+   poolConfig.setMaxTotal(150)
+   poolConfig.setMaxIdle(50)
+   poolConfig.setMaxWaitMillis(200)
+
+   val jedisPools = s2config.REDIS_INSTANCES.map { case (host, port) =>
+     new JedisPool(poolConfig, host, port)
+   }
+
+   def getBucketIdx(key: String): Int = {
+     Hashes.murmur3(key) % jedisPools.size
+   }
+
+   def doBlockWithIndex[T](idx: Int)(f: Jedis => T): Try[T] = {
+     Try {
+       val pool = jedisPools(idx)
+
+       var jedis: Jedis = null
+
+       try {
+         jedis = pool.getResource
+
+         f(jedis)
+       }
+       catch {
+         case e: JedisException =>
+           pool.returnBrokenResource(jedis)
+
+           jedis = null
+           throw e
+       }
+       finally {
+         if (jedis != null) {
+           pool.returnResource(jedis)
+         }
+       }
+     }
+   }
+
+   def doBlockWithKey[T](key: String)(f: Jedis => T): Try[T] = {
+     doBlockWithIndex(getBucketIdx(key))(f)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/CachedDBModel.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/CachedDBModel.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/CachedDBModel.scala
new file mode 100644
index 0000000..6e06caf
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/CachedDBModel.scala
@@ -0,0 +1,11 @@
+package org.apache.s2graph.counter.models
+
+import org.apache.s2graph.counter.util.{CollectionCache, CollectionCacheConfig}
+import scalikejdbc.AutoSession
+
+trait CachedDBModel[T] {
+  implicit val s = AutoSession
+
+  val cacheConfig: CollectionCacheConfig
+  lazy val cache = new CollectionCache[Option[T]](cacheConfig)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/Counter.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/Counter.scala 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/Counter.scala
new file mode 100644
index 0000000..467e110
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/Counter.scala
@@ -0,0 +1,210 @@
+package org.apache.s2graph.counter.models
+
+import com.typesafe.config.Config
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.util.{CollectionCache, CollectionCacheConfig}
+import scalikejdbc._
+
+case class Counter(id: Int, useFlag: Boolean, version: Byte, service: String, 
action: String,
+                   itemType: Counter.ItemType.ItemType, autoComb: Boolean, 
dimension: String,
+                   useProfile: Boolean, bucketImpId: Option[String],
+                   useRank: Boolean,
+                   ttl: Int, dailyTtl: Option[Int], hbaseTable: Option[String],
+                   intervalUnit: Option[String],
+                   rateActionId: Option[Int], rateBaseId: Option[Int], 
rateThreshold: Option[Int]) {
+  val intervals: Array[String] = intervalUnit.map(s => 
s.split(',')).getOrElse(Array("t", "M", "d", "H"))
+  val dimensionSp = if (dimension.isEmpty) Array.empty[String] else 
dimension.split(',').sorted
+
+  val dimensionList: List[Array[String]] = {
+    autoComb match {
+      case true =>
+        for {
+          i <- (0 to math.min(4, dimensionSp.length)).toList
+          combines <- dimensionSp.combinations(i)
+        } yield {
+          combines
+        }
+      case false =>
+        dimensionSp isEmpty match {
+          case true => List(Array())
+          case false => dimensionSp.toList.map(sp => sp.split('.'))
+        }
+    }
+  }
+
+  val dimensionSet: Set[Set[String]] = {
+    for {
+      arr <- dimensionList
+    } yield {
+      arr.toSet
+    }
+  }.toSet
+
+  val isRateCounter: Boolean = rateActionId.isDefined && rateBaseId.isDefined 
&& rateActionId != rateBaseId
+  val isTrendCounter: Boolean = rateActionId.isDefined && rateBaseId.isDefined 
&& rateActionId == rateBaseId
+}
+
+object Counter extends SQLSyntaxSupport[Counter] {
+   object ItemType extends Enumeration {
+     type ItemType = Value
+     val INT, LONG, STRING, BLOB = Value
+   }
+
+   def apply(c: SyntaxProvider[Counter])(rs: WrappedResultSet): Counter = 
apply(c.resultName)(rs)
+   def apply(r: ResultName[Counter])(rs: WrappedResultSet): Counter = {
+     lazy val itemType = Counter.ItemType(rs.int(r.itemType))
+     Counter(rs.int(r.id), rs.boolean(r.useFlag), rs.byte(r.version), 
rs.string(r.service), rs.string(r.action),
+       itemType, rs.boolean(r.autoComb), rs.string(r.dimension),
+       rs.boolean(r.useProfile), rs.stringOpt(r.bucketImpId),
+       rs.boolean(r.useRank),
+       rs.int(r.ttl), rs.intOpt(r.dailyTtl), rs.stringOpt(r.hbaseTable), 
rs.stringOpt(r.intervalUnit),
+       rs.intOpt(r.rateActionId), rs.intOpt(r.rateBaseId), 
rs.intOpt(r.rateThreshold))
+   }
+
+   def apply(useFlag: Boolean, version: Byte, service: String, action: String, 
itemType: Counter.ItemType.ItemType,
+             autoComb: Boolean, dimension: String, useProfile: Boolean = 
false, bucketImpId: Option[String] = None,
+             useRank: Boolean = false, ttl: Int = 259200, dailyTtl: 
Option[Int] = None,
+             hbaseTable: Option[String] = None, intervalUnit: Option[String] = 
None,
+             rateActionId: Option[Int] = None, rateBaseId: Option[Int] = None, 
rateThreshold: Option[Int] = None): Counter = {
+     Counter(-1, useFlag, version, service, action, itemType, autoComb, 
dimension,
+       useProfile, bucketImpId,
+       useRank, ttl, dailyTtl, hbaseTable,
+       intervalUnit, rateActionId, rateBaseId, rateThreshold)
+   }
+ }
+
+class CounterModel(config: Config) extends CachedDBModel[Counter] {
+   private lazy val s2Config = new S2CounterConfig(config)
+   // enable negative cache
+   override val cacheConfig: CollectionCacheConfig =
+     new CollectionCacheConfig(s2Config.CACHE_MAX_SIZE, 
s2Config.CACHE_TTL_SECONDS,
+       negativeCache = true, s2Config.CACHE_NEGATIVE_TTL_SECONDS)
+
+   val c = Counter.syntax("c")
+   val r = c.result
+
+   val multiCache = new CollectionCache[Seq[Counter]](cacheConfig)
+
+   def findById(id: Int, useCache: Boolean = true): Option[Counter] = {
+     lazy val sql = withSQL {
+       selectFrom(Counter as c).where.eq(c.id, id).and.eq(c.useFlag, 1)
+     }.map(Counter(c))
+
+     if (useCache) {
+       cache.withCache(s"_id:$id") {
+         sql.single().apply()
+       }
+     } else {
+       sql.single().apply()
+     }
+   }
+
+   def findByServiceAction(service: String, action: String, useCache: Boolean 
= true): Option[Counter] = {
+     lazy val sql = withSQL {
+       selectFrom(Counter as c).where.eq(c.service, service).and.eq(c.action, 
action).and.eq(c.useFlag, 1)
+     }.map(Counter(c))
+
+     if (useCache) {
+       cache.withCache(s"$service.$action") {
+         sql.single().apply()
+       }
+     }
+     else {
+       sql.single().apply()
+     }
+   }
+
+   def findByRateActionId(rateActionId: Int, useCache: Boolean = true): 
Seq[Counter] = {
+     lazy val sql = withSQL {
+       selectFrom(Counter as c).where.eq(c.rateActionId, 
rateActionId).and.ne(c.rateBaseId, rateActionId).and.eq(c.useFlag, 1)
+     }.map(Counter(c))
+
+     if (useCache) {
+       multiCache.withCache(s"_rate_action_id.$rateActionId") {
+         sql.list().apply()
+       }
+     } else {
+       sql.list().apply()
+     }
+   }
+
+   def findByRateBaseId(rateBaseId: Int, useCache: Boolean = true): 
Seq[Counter] = {
+     lazy val sql = withSQL {
+       selectFrom(Counter as c).where.eq(c.rateBaseId, 
rateBaseId).and.ne(c.rateActionId, rateBaseId).and.eq(c.useFlag, 1)
+     }.map(Counter(c))
+
+     if (useCache) {
+       multiCache.withCache(s"_rate_base_id.$rateBaseId") {
+         sql.list().apply()
+       }
+     } else {
+       sql.list().apply()
+     }
+   }
+
+   def findByTrendActionId(trendActionId: Int, useCache: Boolean = true): 
Seq[Counter] = {
+     lazy val sql = withSQL {
+       selectFrom(Counter as c).where.eq(c.rateActionId, 
trendActionId).and.eq(c.rateBaseId, trendActionId).and.eq(c.useFlag, 1)
+     }.map(Counter(c))
+
+     if (useCache) {
+       multiCache.withCache(s"_trend_action_id.$trendActionId") {
+         sql.list().apply()
+       }
+     } else {
+       sql.list().apply()
+     }
+   }
+
+   def createServiceAction(policy: Counter): Unit = {
+     withSQL {
+       val c = Counter.column
+       insert.into(Counter).namedValues(
+         c.useFlag -> policy.useFlag,
+         c.version -> policy.version,
+         c.service -> policy.service,
+         c.action -> policy.action,
+         c.itemType -> policy.itemType.id,
+         c.autoComb -> policy.autoComb,
+         c.dimension -> policy.dimension,
+         c.useProfile -> policy.useProfile,
+         c.bucketImpId -> policy.bucketImpId,
+         c.useRank -> policy.useRank,
+         c.ttl -> policy.ttl,
+         c.dailyTtl -> policy.dailyTtl,
+         c.hbaseTable -> policy.hbaseTable,
+         c.intervalUnit -> policy.intervalUnit,
+         c.rateActionId -> policy.rateActionId,
+         c.rateBaseId -> policy.rateBaseId,
+         c.rateThreshold -> policy.rateThreshold
+       )
+     }.update().apply()
+   }
+
+   def updateServiceAction(policy: Counter): Unit = {
+     withSQL {
+       val c = Counter.column
+       update(Counter).set(
+         c.autoComb -> policy.autoComb,
+         c.dimension -> policy.dimension,
+         c.useProfile -> policy.useProfile,
+         c.bucketImpId -> policy.bucketImpId,
+         c.useRank -> policy.useRank,
+         c.intervalUnit -> policy.intervalUnit,
+         c.rateActionId -> policy.rateActionId,
+         c.rateBaseId -> policy.rateBaseId,
+         c.rateThreshold -> policy.rateThreshold
+       ).where.eq(c.id, policy.id)
+     }.update().apply()
+   }
+
+   def deleteServiceAction(policy: Counter): Unit = {
+     withSQL {
+       val c = Counter.column
+       update(Counter).set(
+         c.action -> s"deleted_${System.currentTimeMillis()}_${policy.action}",
+         c.useFlag -> false
+       ).where.eq(c.id, policy.id)
+     }.update().apply()
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/DBModel.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/DBModel.scala 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/DBModel.scala
new file mode 100644
index 0000000..1757a7f
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/models/DBModel.scala
@@ -0,0 +1,24 @@
+package org.apache.s2graph.counter.models
+
+import com.typesafe.config.Config
+import org.apache.s2graph.counter.config.S2CounterConfig
+import scalikejdbc._
+
+object DBModel {
+  private var initialized = false
+  
+  def initialize(config: Config): Unit = {
+    if (!initialized) {
+      this synchronized {
+        if (!initialized) {
+          val s2Config = new S2CounterConfig(config)
+          Class.forName(s2Config.DB_DEFAULT_DRIVER)
+          val settings = ConnectionPoolSettings(initialSize = 0, maxSize = 10, 
connectionTimeoutMillis = 5000L, validationQuery = "select 1;")
+
+          ConnectionPool.singleton(s2Config.DB_DEFAULT_URL, 
s2Config.DB_DEFAULT_USER, s2Config.DB_DEFAULT_PASSWORD, settings)
+          initialized = true
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala
new file mode 100644
index 0000000..01bff0c
--- /dev/null
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/package.scala
@@ -0,0 +1,8 @@
+package org.apache.s2graph
+
+package object counter {
+  val VERSION_1: Byte = 1
+  val VERSION_2: Byte = 2
+
+  case class MethodNotSupportedException(message: String, cause: Throwable = 
null) extends Exception(message, cause)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CartesianProduct.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CartesianProduct.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CartesianProduct.scala
new file mode 100644
index 0000000..b1712fa
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CartesianProduct.scala
@@ -0,0 +1,8 @@
+package org.apache.s2graph.counter.util
+
+object CartesianProduct {
+  def apply[T](xss: List[List[T]]): List[List[T]] = xss match {
+    case Nil => List(Nil)
+    case h :: t => for(xh <- h; xt <- apply(t)) yield xh :: xt
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CollectionCache.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CollectionCache.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CollectionCache.scala
new file mode 100644
index 0000000..74b5238
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/CollectionCache.scala
@@ -0,0 +1,66 @@
+package org.apache.s2graph.counter.util
+
+import java.net.InetAddress
+import java.util.concurrent.TimeUnit
+
+import com.google.common.cache.{Cache, CacheBuilder}
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.language.{postfixOps, reflectiveCalls}
+
+case class CollectionCacheConfig(maxSize: Int, ttl: Int, negativeCache: 
Boolean = false, negativeTTL: Int = 600)
+
+class CollectionCache[C <: { def nonEmpty: Boolean; def isEmpty: Boolean } 
](config: CollectionCacheConfig) {
+  private val cache: Cache[String, C] = CacheBuilder.newBuilder()
+    .expireAfterWrite(config.ttl, TimeUnit.SECONDS)
+    .maximumSize(config.maxSize)
+    .build[String, C]()
+
+//  private lazy val cache = new SynchronizedLruMap[String, (C, 
Int)](config.maxSize)
+  private lazy val className = this.getClass.getSimpleName
+
+  private lazy val log = LoggerFactory.getLogger(this.getClass)
+  val localHostname = InetAddress.getLocalHost.getHostName
+
+  def size = cache.size
+  val maxSize = config.maxSize
+
+  // cache statistics
+  def getStatsString: String = {
+    s"$localHostname ${cache.stats().toString}"
+  }
+
+  def withCache(key: String)(op: => C): C = {
+    Option(cache.getIfPresent(key)) match {
+      case Some(r) => r
+      case None =>
+        val r = op
+        if (r.nonEmpty || config.negativeCache) {
+          cache.put(key, r)
+        }
+        r
+    }
+  }
+
+  def withCacheAsync(key: String)(op: => Future[C])(implicit ec: 
ExecutionContext): Future[C] = {
+    Option(cache.getIfPresent(key)) match {
+      case Some(r) => Future.successful(r)
+      case None =>
+        op.map { r =>
+          if (r.nonEmpty || config.negativeCache) {
+            cache.put(key, r)
+          }
+          r
+        }
+    }
+  }
+
+  def purgeKey(key: String) = {
+    cache.invalidate(key)
+  }
+
+  def contains(key: String): Boolean = {
+    Option(cache.getIfPresent(key)).nonEmpty
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/FunctionParser.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/FunctionParser.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/FunctionParser.scala
new file mode 100644
index 0000000..e65910f
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/FunctionParser.scala
@@ -0,0 +1,21 @@
+package org.apache.s2graph.counter.util
+
+object FunctionParser {
+  val funcRe = """([a-zA-Z_]+)(\((\d+)?\))?""".r
+
+  def apply(str: String): Option[(String, String)] = {
+    str match {
+      case funcRe(funcName, funcParam, funcArg) =>
+        funcName match {
+          case x: String =>
+            Some((funcName, funcArg match {
+              case x: String => funcArg
+              case null => ""
+            }))
+          case null => None
+        }
+      case _ =>
+        None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Hashes.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Hashes.scala 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Hashes.scala
new file mode 100644
index 0000000..634b723
--- /dev/null
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Hashes.scala
@@ -0,0 +1,21 @@
+package org.apache.s2graph.counter.util
+
+import org.apache.hadoop.hbase.util.Bytes
+
+import scala.util.hashing.MurmurHash3
+
+object Hashes {
+  def sha1(s: String): String = {
+    val md = java.security.MessageDigest.getInstance("SHA-1")
+    Bytes.toHex(md.digest(s.getBytes("UTF-8")))
+  }
+  
+  private def positiveHash(h: Int): Int = {
+    if (h < 0) -1 * (h + 1) else h
+  }
+
+  def murmur3(s: String): Int = {
+    val hash = MurmurHash3.stringHash(s)
+    positiveHash(hash)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/ReduceMapValue.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/ReduceMapValue.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/ReduceMapValue.scala
new file mode 100644
index 0000000..037813b
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/ReduceMapValue.scala
@@ -0,0 +1,9 @@
+package org.apache.s2graph.counter.util
+
+class ReduceMapValue[T, U](op: (U, U) => U, default: U) {
+   def apply(m1: Map[T, U], m2: Map[T, U]): Map[T, U] = {
+     m1 ++ m2.map { case (k, v) =>
+       k -> op(m1.getOrElse(k, default), v)
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Retry.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Retry.scala 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Retry.scala
new file mode 100644
index 0000000..f49a231
--- /dev/null
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/Retry.scala
@@ -0,0 +1,44 @@
+package org.apache.s2graph.counter.util
+
+import scala.annotation.tailrec
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+object Retry {
+  @tailrec
+  def apply[T](n: Int, withSleep: Boolean = true, tryCount: Int = 0)(fn: => 
T): T = {
+    Try { fn } match {
+      case Success(x) => x
+      case Failure(e) if e.isInstanceOf[RetryStopException] => throw e.getCause
+      case _ if n > 1 =>
+        // backoff
+        if (withSleep) Thread.sleep(tryCount * 1000)
+        apply(n - 1, withSleep, tryCount + 1)(fn)
+      case Failure(e) => throw e
+    }
+  }
+}
+
+object RetryAsync {
+  def apply[T](n: Int, withSleep: Boolean = true, tryCount: Int = 0)(fn: => 
Future[T])(implicit ex: ExecutionContext): Future[T] = {
+    val promise = Promise[T]()
+    fn onComplete {
+      case Success(x) => promise.success(x)
+      case Failure(e) if e.isInstanceOf[RetryStopException] => 
promise.failure(e.getCause)
+      case _ if n > 1 =>
+        // backoff
+        if (withSleep) Thread.sleep(tryCount * 1000)
+        apply(n - 1, withSleep, tryCount + 1)(fn)
+      case Failure(e) => promise.failure(e)
+    }
+    promise.future
+  }
+}
+
+class RetryStopException(message: String, cause: Throwable)
+  extends Exception(message, cause) {
+
+  def this(message: String) = this(message, null)
+
+  def this(cause: Throwable) = this(cause.toString, cause)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/SplitBytes.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/SplitBytes.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/SplitBytes.scala
new file mode 100644
index 0000000..34f4cf4
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/SplitBytes.scala
@@ -0,0 +1,21 @@
+package org.apache.s2graph.counter.util
+
+object SplitBytes {
+  def apply(bytes: Array[Byte], sizes: Seq[Int]): Seq[Array[Byte]] = {
+    if (sizes.sum > bytes.length) {
+      throw new Exception(s"sizes.sum bigger than bytes.length ${sizes.sum} > 
${bytes.length}} ")
+    }
+
+    var position = 0
+    val rtn = {
+      for {
+        size <- sizes
+      } yield {
+        val slice = bytes.slice(position, position + size)
+        position += size
+        slice
+      }
+    }
+    rtn ++ Seq(bytes.drop(position))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/UnitConverter.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/UnitConverter.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/UnitConverter.scala
new file mode 100644
index 0000000..af6bc0c
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/util/UnitConverter.scala
@@ -0,0 +1,25 @@
+package org.apache.s2graph.counter.util
+
+object UnitConverter {
+  def toMillis(ts: Int): Long = {
+    ts * 1000L
+  }
+
+  def toMillis(ts: Long): Long = {
+    if (ts <= Int.MaxValue) {
+      ts * 1000
+    } else {
+      ts
+    }
+  }
+
+  def toMillis(s: String): Long = {
+    toMillis(s.toLong)
+  }
+
+  def toHours(ts: Long): Long = {
+    toMillis(ts) / HOUR_MILLIS * HOUR_MILLIS
+  }
+
+  val HOUR_MILLIS = 60 * 60 * 1000
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/config/ConfigFunctions.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/config/ConfigFunctions.scala 
b/s2counter_core/src/main/scala/s2/config/ConfigFunctions.scala
deleted file mode 100644
index 20d07ce..0000000
--- a/s2counter_core/src/main/scala/s2/config/ConfigFunctions.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package s2.config
-
-import com.typesafe.config.Config
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 3. 2..
- */
-abstract class ConfigFunctions(conf: Config) {
-  def getOrElse[T: ClassTag](key: String, default: T): T = {
-    val ret = if (conf.hasPath(key)) (default match {
-      case _: String => conf.getString(key)
-      case _: Int | _: Integer => conf.getInt(key)
-      case _: Float | _: Double => conf.getDouble(key)
-      case _: Boolean => conf.getBoolean(key)
-      case _ => default
-    }).asInstanceOf[T]
-    else default
-    println(s"${this.getClass.getName}: $key -> $ret")
-    ret
-  }
-
-  def getConfigMap(path: String): Map[String, String] = {
-    conf.getConfig(path).entrySet().map { entry =>
-      val key = s"$path.${entry.getKey}"
-      val value = conf.getString(key)
-      println(s"${this.getClass.getName}: $key -> $value")
-      key -> value
-    }.toMap
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/config/S2CounterConfig.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/config/S2CounterConfig.scala 
b/s2counter_core/src/main/scala/s2/config/S2CounterConfig.scala
deleted file mode 100644
index fcd0e6a..0000000
--- a/s2counter_core/src/main/scala/s2/config/S2CounterConfig.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-package s2.config
-
-import com.typesafe.config.Config
-
-import scala.collection.JavaConversions._
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 3. 2..
- */
-class S2CounterConfig(config: Config) extends ConfigFunctions(config) {
-  // HBase
-  lazy val HBASE_ZOOKEEPER_QUORUM = getOrElse("hbase.zookeeper.quorum", "")
-  lazy val HBASE_TABLE_NAME = getOrElse("hbase.table.name", "s2counter")
-  lazy val HBASE_TABLE_POOL_SIZE = getOrElse("hbase.table.pool.size", 100)
-  lazy val HBASE_CONNECTION_POOL_SIZE = 
getOrElse("hbase.connection.pool.size", 10)
-
-  lazy val HBASE_CLIENT_IPC_POOL_SIZE = 
getOrElse("hbase.client.ipc.pool.size", 5)
-  lazy val HBASE_CLIENT_MAX_TOTAL_TASKS = 
getOrElse("hbase.client.max.total.tasks", 100)
-  lazy val HBASE_CLIENT_MAX_PERSERVER_TASKS = 
getOrElse("hbase.client.max.perserver.tasks", 5)
-  lazy val HBASE_CLIENT_MAX_PERREGION_TASKS = 
getOrElse("hbase.client.max.perregion.tasks", 1)
-  lazy val HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD = 
getOrElse("hbase.client.scanner.timeout.period", 300)
-  lazy val HBASE_CLIENT_OPERATION_TIMEOUT = 
getOrElse("hbase.client.operation.timeout", 100)
-  lazy val HBASE_CLIENT_RETRIES_NUMBER = 
getOrElse("hbase.client.retries.number", 1)
-
-  // MySQL
-  lazy val DB_DEFAULT_DRIVER = getOrElse("db.default.driver", 
"com.mysql.jdbc.Driver")
-  lazy val DB_DEFAULT_URL = getOrElse("db.default.url", "")
-  lazy val DB_DEFAULT_USER = getOrElse("db.default.user", "graph")
-  lazy val DB_DEFAULT_PASSWORD = getOrElse("db.default.password", "graph")
-
-  // Redis
-  lazy val REDIS_INSTANCES = (for {
-    s <- config.getStringList("redis.instances")
-  } yield {
-    val sp = s.split(':')
-    (sp(0), if (sp.length > 1) sp(1).toInt else 6379)
-  }).toList
-
-  // Graph
-  lazy val GRAPH_URL = getOrElse("s2graph.url", "http://localhost:9000";)
-  lazy val GRAPH_READONLY_URL = getOrElse("s2graph.read-only.url", GRAPH_URL)
-
-  // Cache
-  lazy val CACHE_TTL_SECONDS = getOrElse("cache.ttl.seconds", 600)
-  lazy val CACHE_MAX_SIZE = getOrElse("cache.max.size", 10000)
-  lazy val CACHE_NEGATIVE_TTL_SECONDS = 
getOrElse("cache.negative.ttl.seconds", CACHE_TTL_SECONDS)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/TrxLog.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/TrxLog.scala 
b/s2counter_core/src/main/scala/s2/counter/TrxLog.scala
deleted file mode 100644
index c1db356..0000000
--- a/s2counter_core/src/main/scala/s2/counter/TrxLog.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package s2.counter
-
-import play.api.libs.json.Json
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 4. 7..
- */
-// item1 -> likedCount -> month:2015-10, 1
-// edge
-  // policyId = Label.findByName(likedCount).id.get
-  // item = edge.srcVertexId
-  // results =
-case class TrxLog(success: Boolean, policyId: Int, item: String, results: 
Iterable[TrxLogResult])
-
-// interval = m, ts = 2015-10, "age.gender.20.M", 1, 2
-case class TrxLogResult(interval: String, ts: Long, dimension: String, value: 
Long, result: Long = -1)
-
-object TrxLogResult {
-  implicit val writes = Json.writes[TrxLogResult]
-  implicit val reads = Json.reads[TrxLogResult]
-  implicit val formats = Json.format[TrxLogResult]
-}
-
-object TrxLog {
-  implicit val writes = Json.writes[TrxLog]
-  implicit val reads = Json.reads[TrxLog]
-  implicit val formats = Json.format[TrxLog]
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/BytesUtil.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/BytesUtil.scala 
b/s2counter_core/src/main/scala/s2/counter/core/BytesUtil.scala
deleted file mode 100644
index 1d945ed..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/BytesUtil.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package s2.counter.core
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 6. 11..
- */
-trait BytesUtil {
-  def getRowKeyPrefix(id: Int): Array[Byte]
-
-  def toBytes(key: ExactKeyTrait): Array[Byte]
-  def toBytes(eq: ExactQualifier): Array[Byte]
-  def toBytes(tq: TimedQualifier): Array[Byte]
-
-  def toExactQualifier(bytes: Array[Byte]): ExactQualifier
-  def toTimedQualifier(bytes: Array[Byte]): TimedQualifier
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/ExactCounter.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/ExactCounter.scala 
b/s2counter_core/src/main/scala/s2/counter/core/ExactCounter.scala
deleted file mode 100644
index 7f36681..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/ExactCounter.scala
+++ /dev/null
@@ -1,247 +0,0 @@
-package s2.counter.core
-
-import com.typesafe.config.Config
-import org.slf4j.LoggerFactory
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core.TimedQualifier.IntervalUnit.IntervalUnit
-import s2.counter.decay.ExpDecayFormula
-import s2.counter.{TrxLog, TrxLogResult}
-import s2.models.Counter
-import s2.util.{CollectionCache, CollectionCacheConfig, FunctionParser}
-
-import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future}
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 6. 11..
- */
-
-case class ExactCounterRow(key: ExactKeyTrait, value: Map[ExactQualifier, 
Long])
-
-case class FetchedCounts(exactKey: ExactKeyTrait, qualifierWithCountMap: 
Map[ExactQualifier, Long])
-case class DecayedCounts(exactKey: ExactKeyTrait, qualifierWithCountMap: 
Map[ExactQualifier, Double])
-
-case class FetchedCountsGrouped(exactKey: ExactKeyTrait, intervalWithCountMap: 
Map[(IntervalUnit, Map[String, String]), Map[ExactQualifier, Long]])
-
-class ExactCounter(config: Config, storage: ExactStorage) {
-  import ExactCounter._
-
-  val syncDuration = Duration(10, SECONDS)
-  private val log = LoggerFactory.getLogger(getClass)
-
-  val storageStatusCache = new 
CollectionCache[Option[Boolean]](CollectionCacheConfig(1000, 60, negativeCache 
= false, 60))
-  
-  // dimension: age, value of ages
-  def getCountAsync(policy: Counter,
-                    itemId: String,
-                    intervals: Seq[IntervalUnit],
-                    from: Long,
-                    to: Long,
-                    dimension: Map[String, Set[String]])
-                   (implicit ex: ExecutionContext): 
Future[Option[FetchedCountsGrouped]] = {
-    for {
-      fetchedCounts <- getCountsAsync(policy, Seq(itemId),
-        intervals.map(interval => (TimedQualifier(interval, from), 
TimedQualifier(interval, to))), dimension)
-    } yield {
-      fetchedCounts.headOption
-    }
-  }
-
-  // multi item, time range and multi dimension
-  def getCountsAsync(policy: Counter,
-                     items: Seq[String],
-                     timeRange: Seq[(TimedQualifier, TimedQualifier)],
-                     dimQuery: Map[String, Set[String]])
-                    (implicit ex: ExecutionContext): 
Future[Seq[FetchedCountsGrouped]] = {
-    storage.get(policy, items, timeRange, dimQuery)
-  }
-
-  def getCount(policy: Counter,
-               itemId: String,
-               intervals: Seq[IntervalUnit],
-               from: Long,
-               to: Long,
-               dimension: Map[String, Set[String]])
-              (implicit ex: ExecutionContext): Option[FetchedCountsGrouped] = {
-    getCounts(policy, Seq(itemId),
-      intervals.map(interval => (TimedQualifier(interval, from), 
TimedQualifier(interval, to))), dimension).headOption
-  }
-
-  def getCount(policy: Counter,
-               itemId: String,
-               intervals: Seq[IntervalUnit],
-               from: Long,
-               to: Long)
-              (implicit ex: ExecutionContext): Option[FetchedCounts] = {
-    val future = storage.get(policy,
-      Seq(itemId),
-      intervals.map(interval => (TimedQualifier(interval, from), 
TimedQualifier(interval, to))))
-    Await.result(future, syncDuration).headOption
-  }
-
-  // multi item, time range and multi dimension
-  def getCounts(policy: Counter,
-                items: Seq[String],
-                timeRange: Seq[(TimedQualifier, TimedQualifier)],
-                dimQuery: Map[String, Set[String]])
-               (implicit ex: ExecutionContext): Seq[FetchedCountsGrouped] = {
-    Await.result(storage.get(policy, items, timeRange, dimQuery), syncDuration)
-  }
-
-  def getRelatedCounts(policy: Counter, keyWithQualifiers: Seq[(String, 
Seq[ExactQualifier])])
-                      (implicit ex: ExecutionContext): Map[String, 
Map[ExactQualifier, Long]] = {
-    val queryKeyWithQualifiers = {
-      for {
-        (itemKey, qualifiers) <- keyWithQualifiers
-      } yield {
-        val relKey = ExactKey(policy.id, policy.version, policy.itemType, 
itemKey)
-        (relKey, qualifiers)
-      }
-    }
-    val future = storage.get(policy, queryKeyWithQualifiers)
-
-    for {
-      FetchedCounts(exactKey, exactQualifierToLong) <- Await.result(future, 
syncDuration)
-    } yield {
-      exactKey.itemKey -> exactQualifierToLong
-    }
-  }.toMap
-
-  def getPastCounts(policy: Counter, keyWithQualifiers: Seq[(String, 
Seq[ExactQualifier])])
-                   (implicit ex: ExecutionContext): Map[String, 
Map[ExactQualifier, Long]] = {
-    // query paste count
-    val queryKeyWithQualifiers = {
-      for {
-        (itemKey, qualifiers) <- keyWithQualifiers
-      } yield {
-        val relKey = ExactKey(policy.id, policy.version, policy.itemType, 
itemKey)
-        (relKey, qualifiers.map(eq => eq.copy(tq = eq.tq.add(-1))))
-      }
-    }
-    val future = storage.get(policy, queryKeyWithQualifiers)
-
-    for {
-      FetchedCounts(exactKey, exactQualifierToLong) <- Await.result(future, 
syncDuration)
-    } yield {
-      // restore tq
-      exactKey.itemKey -> exactQualifierToLong.map { case (eq, v) =>
-        eq.copy(tq = eq.tq.add(1)) -> v
-      }
-    }
-  }.toMap
-
-  def getDecayedCountsAsync(policy: Counter,
-                            items: Seq[String],
-                            timeRange: Seq[(TimedQualifier, TimedQualifier)],
-                            dimQuery: Map[String, Set[String]],
-                            qsSum: Option[String])(implicit ex: 
ExecutionContext): Future[Seq[DecayedCounts]] = {
-    val groupedTimeRange = timeRange.groupBy(_._1.q)
-    getCountsAsync(policy, items, timeRange, dimQuery).map { seq =>
-      for {
-        FetchedCountsGrouped(k, intervalWithCountMap) <- seq
-      } yield {
-        DecayedCounts(k, {
-          for {
-            ((interval, dimKeyValues), grouped) <- intervalWithCountMap
-          } yield {
-            val (tqFrom, tqTo) = groupedTimeRange(interval).head
-            val formula = {
-              for {
-                strSum <- qsSum
-                (func, arg) <- FunctionParser(strSum)
-              } yield {
-                // apply function
-                func.toLowerCase match {
-                  case "exp_decay" => 
ExpDecayFormula.byMeanLifeTime(arg.toLong * TimedQualifier.getTsUnit(interval))
-                  case _ => throw new UnsupportedOperationException(s"unknown 
function: $strSum")
-                }
-              }
-            }
-            ExactQualifier(tqFrom, dimKeyValues) -> {
-              grouped.map { case (eq, count) =>
-                formula match {
-                  case Some(decay) =>
-                    decay(count, tqTo.ts - eq.tq.ts)
-                  case None =>
-                    count
-                }
-              }.sum
-            }
-          }
-        })
-      }
-    }
-  }
-
-  def updateCount(policy: Counter, counts: Seq[(ExactKeyTrait, 
ExactValueMap)]): Seq[TrxLog] = {
-    ready(policy) match {
-      case true =>
-        val updateResults = storage.update(policy, counts)
-        for {
-          (exactKey, values) <- counts
-          results = updateResults.getOrElse(exactKey, Nil.toMap)
-        } yield {
-          TrxLog(results.nonEmpty, exactKey.policyId, exactKey.itemKey, 
makeTrxLogResult(values, results))
-        }
-      case false =>
-        Nil
-    }
-  }
-
-  def deleteCount(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = {
-    storage.delete(policy, keys)
-  }
-
-  private def makeTrxLogResult(values: ExactValueMap, results: ExactValueMap): 
Seq[TrxLogResult] = {
-    for {
-      (eq, value) <- values
-    } yield {
-      val result = results.getOrElse(eq, -1l)
-      TrxLogResult(eq.tq.q.toString, eq.tq.ts, eq.dimension, value, result)
-    }
-  }.toSeq
-
-  def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean] 
= {
-    storage.insertBlobValue(policy, keys)
-  }
-
-  def getBlobValue(policy: Counter, blobId: String): Option[String] = {
-    storage.getBlobValue(policy, blobId)
-  }
-
-  def prepare(policy: Counter) = {
-    storage.prepare(policy)
-  }
-
-  def destroy(policy: Counter) = {
-    storage.destroy(policy)
-  }
-
-  def ready(policy: Counter): Boolean = {
-    storageStatusCache.withCache(s"${policy.id}") {
-      val ready = storage.ready(policy)
-      if (!ready) {
-        // if key is not in cache, log message
-        log.warn(s"${policy.service}.${policy.action} storage is not ready.")
-      }
-      Some(ready)
-    }.getOrElse(false)
-  }
-}
-
-object ExactCounter {
-  object ColumnFamily extends Enumeration {
-    type ColumnFamily = Value
-
-    val SHORT = Value("s")
-    val LONG = Value("l")
-  }
-  import IntervalUnit._
-  val intervalsMap = Map(MINUTELY -> ColumnFamily.SHORT, HOURLY -> 
ColumnFamily.SHORT,
-    DAILY -> ColumnFamily.LONG, MONTHLY -> ColumnFamily.LONG, TOTAL -> 
ColumnFamily.LONG)
-
-  val blobCF = ColumnFamily.LONG.toString.getBytes
-  val blobColumn = "b".getBytes
-
-  type ExactValueMap = Map[ExactQualifier, Long]
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/ExactKey.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/ExactKey.scala 
b/s2counter_core/src/main/scala/s2/counter/core/ExactKey.scala
deleted file mode 100644
index 63aca51..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/ExactKey.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-package s2.counter.core
-
-import s2.models.Counter
-import s2.models.Counter.ItemType
-import s2.models.Counter.ItemType.ItemType
-import s2.util.Hashes
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 5. 27..
- */
-trait ExactKeyTrait {
-  def policyId: Int
-  def version: Byte
-  def itemType: ItemType
-  def itemKey: String
-}
-
-case class ExactKey(policyId: Int, version: Byte, itemType: ItemType, itemKey: 
String) extends ExactKeyTrait
-case class BlobExactKey(policyId: Int, version: Byte, itemType: ItemType, 
itemKey: String, itemId: String) extends ExactKeyTrait
-
-object ExactKey {
-  def apply(policy: Counter, itemId: String, checkItemType: Boolean): 
ExactKeyTrait = {
-    if (checkItemType && policy.itemType == ItemType.BLOB) {
-      BlobExactKey(policy.id, policy.version, ItemType.BLOB, 
Hashes.sha1(itemId), itemId)
-    } else {
-      ExactKey(policy.id, policy.version, policy.itemType, itemId)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/ExactQualifier.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/ExactQualifier.scala 
b/s2counter_core/src/main/scala/s2/counter/core/ExactQualifier.scala
deleted file mode 100644
index f4ac708..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/ExactQualifier.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-package s2.counter.core
-
-import java.util
-
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-import s2.counter.core.TimedQualifier.IntervalUnit.IntervalUnit
-
-import scala.collection.JavaConversions._
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 5. 27..
- */
-case class ExactQualifier(tq: TimedQualifier, dimKeyValues: Map[String, 
String], dimension: String) {
-  def checkDimensionEquality(dimQuery: Map[String, Set[String]]): Boolean = {
-//    println(s"self: $dimKeyValues, query: $dimQuery")
-    dimQuery.size == dimKeyValues.size && {
-      for {
-        (k, v) <- dimKeyValues
-      } yield {
-        dimQuery.get(k).exists(qv => qv.isEmpty || qv.contains(v))
-      }
-    }.forall(x => x)
-  }
-}
-
-object ExactQualifier {
-  val cache: LoadingCache[String, Map[String, String]] = 
CacheBuilder.newBuilder()
-    .maximumSize(10000)
-    .build(
-      new CacheLoader[String, Map[String, String]]() {
-        def load(s: String): Map[String, String] = {
-          strToDimensionMap(s)
-        }
-      }
-    )
-
-  def apply(tq: TimedQualifier, dimension: String): ExactQualifier = {
-    ExactQualifier(tq, cache.get(dimension), dimension)
-  }
-
-  def apply(tq: TimedQualifier, dimKeyValues: Map[String, String]): 
ExactQualifier = {
-    ExactQualifier(tq, dimKeyValues, makeDimensionStr(dimKeyValues))
-  }
-
-  def makeSortedDimension(dimKeyValues: Map[String, String]): Iterator[String] 
= {
-    val sortedDimKeyValues = new util.TreeMap[String, String](dimKeyValues)
-    sortedDimKeyValues.keysIterator ++ sortedDimKeyValues.valuesIterator
-  }
-
-  def makeDimensionStr(dimKeyValues: Map[String, String]): String = {
-    makeSortedDimension(dimKeyValues).mkString(".")
-  }
-
-  def getQualifiers(intervals: Seq[IntervalUnit], ts: Long, dimKeyValues: 
Map[String, String]): Seq[ExactQualifier] = {
-    for {
-      tq <- TimedQualifier.getQualifiers(intervals, ts)
-    } yield {
-      ExactQualifier(tq, dimKeyValues, makeDimensionStr(dimKeyValues))
-    }
-  }
-
-  def strToDimensionMap(dimension: String): Map[String, String] = {
-    val dimSp = {
-      val sp = dimension.split('.')
-      if (dimension == ".") {
-        Array("", "")
-      }
-      else if (dimension.nonEmpty && dimension.last == '.') {
-        sp ++ Array("")
-      } else {
-        sp
-      }
-    }
-    val dimKey = dimSp.take(dimSp.length / 2)
-    val dimVal = dimSp.takeRight(dimSp.length / 2)
-    dimKey.zip(dimVal).toMap
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/ExactStorage.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/ExactStorage.scala 
b/s2counter_core/src/main/scala/s2/counter/core/ExactStorage.scala
deleted file mode 100644
index 6a81f41..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/ExactStorage.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-package s2.counter.core
-
-import s2.counter.core.ExactCounter.ExactValueMap
-import s2.models.Counter
-
-import scala.concurrent.{ExecutionContext, Future}
-
-/**
- * Created by shon on 8/12/15.
- */
-trait ExactStorage {
-  // for range query and check dimension
-  def get(policy: Counter,
-          items: Seq[String],
-          timeRange: Seq[(TimedQualifier, TimedQualifier)],
-          dimQuery: Map[String, Set[String]])
-         (implicit ec: ExecutionContext): Future[Seq[FetchedCountsGrouped]]
-  // for range query
-  def get(policy: Counter,
-          items: Seq[String],
-          timeRange: Seq[(TimedQualifier, TimedQualifier)])
-         (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]]
-  // for query exact qualifier
-  def get(policy: Counter,
-          queries: Seq[(ExactKeyTrait, Seq[ExactQualifier])])
-         (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]]
-  def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): 
Map[ExactKeyTrait, ExactValueMap]
-  def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit
-  def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): Seq[Boolean]
-  def getBlobValue(policy: Counter, blobId: String): Option[String]
-
-  def prepare(policy: Counter): Unit
-  def destroy(policy: Counter): Unit
-  def ready(policy: Counter): Boolean
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RankingCounter.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/RankingCounter.scala 
b/s2counter_core/src/main/scala/s2/counter/core/RankingCounter.scala
deleted file mode 100644
index b98ef30..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/RankingCounter.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-package s2.counter.core
-
-import java.util.concurrent.TimeUnit
-
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
-import com.typesafe.config.Config
-import org.slf4j.LoggerFactory
-import s2.counter.core.RankingCounter.RankingValueMap
-import s2.models.Counter
-import s2.util.{CollectionCache, CollectionCacheConfig}
-
-import scala.collection.JavaConversions._
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 6. 19..
- */
-case class RankingRow(key: RankingKey, value: Map[String, RankingValue])
-case class RateRankingRow(key: RankingKey, value: Map[String, 
RateRankingValue])
-
-class RankingCounter(config: Config, storage: RankingStorage) {
-  private val log = LoggerFactory.getLogger(getClass)
-
-  val storageStatusCache = new 
CollectionCache[Option[Boolean]](CollectionCacheConfig(1000, 60, negativeCache 
= false, 60))
-
-  val cache: LoadingCache[RankingKey, RankingResult] = 
CacheBuilder.newBuilder()
-    .maximumSize(1000000)
-    .expireAfterWrite(10l, TimeUnit.MINUTES)
-    .build(
-      new CacheLoader[RankingKey, RankingResult]() {
-        def load(rankingKey: RankingKey): RankingResult = {
-//          log.warn(s"cache load: $rankingKey")
-          storage.getTopK(rankingKey, 
Int.MaxValue).getOrElse(RankingResult(-1, Nil))
-        }
-      }
-    )
-
-  def getTopK(rankingKey: RankingKey, k: Int = Int.MaxValue): 
Option[RankingResult] = {
-    val tq = rankingKey.eq.tq
-    if (TimedQualifier.getQualifiers(Seq(tq.q), 
System.currentTimeMillis()).head == tq) {
-      // do not use cache
-      storage.getTopK(rankingKey, k)
-    }
-    else {
-      val result = cache.get(rankingKey)
-      if (result.values.nonEmpty) {
-        Some(result.copy(values = result.values.take(k)))
-      }
-      else {
-        None
-      }
-    }
-  }
-
-  def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = {
-    storage.update(key, value, k)
-  }
-
-  def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit = {
-    storage.update(values, k)
-  }
-
-  def delete(key: RankingKey): Unit = {
-    storage.delete(key)
-  }
-
-  def getAllItems(keys: Seq[RankingKey], k: Int = Int.MaxValue): Seq[String] = 
{
-    val oldKeys = keys.filter(key => 
TimedQualifier.getQualifiers(Seq(key.eq.tq.q), System.currentTimeMillis()).head 
!= key.eq.tq)
-    val cached = cache.getAllPresent(oldKeys)
-    val missed = keys.diff(cached.keys.toSeq)
-    val found = storage.getTopK(missed, k)
-
-//    log.warn(s"cached: ${cached.size()}, missed: ${missed.size}")
-
-    for {
-      (key, result) <- found
-    } {
-      cache.put(key, result)
-    }
-
-    for {
-      (key, RankingResult(totalScore, values)) <- cached ++ found
-      (item, score) <- values
-    } yield {
-      item
-    }
-  }.toSeq.distinct
-
-  def prepare(policy: Counter): Unit = {
-    storage.prepare(policy)
-  }
-
-  def destroy(policy: Counter): Unit = {
-    storage.destroy(policy)
-  }
-
-  def ready(policy: Counter): Boolean = {
-    storageStatusCache.withCache(s"${policy.id}") {
-      Some(storage.ready(policy))
-    }.getOrElse(false)
-  }
-}
-
-object RankingCounter {
-  type RankingValueMap = Map[String, RankingValue]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RankingKey.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/RankingKey.scala 
b/s2counter_core/src/main/scala/s2/counter/core/RankingKey.scala
deleted file mode 100644
index 7d70625..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/RankingKey.scala
+++ /dev/null
@@ -1,6 +0,0 @@
-package s2.counter.core
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 6. 19..
- */
-case class RankingKey(policyId: Int, version: Byte, eq: ExactQualifier)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RankingResult.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/RankingResult.scala 
b/s2counter_core/src/main/scala/s2/counter/core/RankingResult.scala
deleted file mode 100644
index 42fae3e..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/RankingResult.scala
+++ /dev/null
@@ -1,6 +0,0 @@
-package s2.counter.core
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 6. 19..
- */
-case class RankingResult(totalScore: Double, values: Seq[(String, Double)])

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RankingStorage.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/RankingStorage.scala 
b/s2counter_core/src/main/scala/s2/counter/core/RankingStorage.scala
deleted file mode 100644
index b643bd8..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/RankingStorage.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-package s2.counter.core
-
-import s2.counter.core.RankingCounter.RankingValueMap
-import s2.models.Counter
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 6. 22..
- */
-trait RankingStorage {
-  def getTopK(key: RankingKey, k: Int): Option[RankingResult]
-  def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, RankingResult)]
-  def update(key: RankingKey, value: RankingValueMap, k: Int): Unit
-  def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): Unit
-  def delete(key: RankingKey)
-
-  def prepare(policy: Counter): Unit
-  def destroy(policy: Counter): Unit
-  def ready(policy: Counter): Boolean
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RankingValue.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/RankingValue.scala 
b/s2counter_core/src/main/scala/s2/counter/core/RankingValue.scala
deleted file mode 100644
index 4b67633..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/RankingValue.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-package s2.counter.core
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 6. 22..
- */
-
-/**
- * ranking score and increment value
- * @param score ranking score
- * @param increment increment value for v1
- */
-case class RankingValue(score: Double, increment: Double)
-
-object RankingValue {
-  def reduce(r1: RankingValue, r2: RankingValue): RankingValue = {
-    // maximum score and sum of increment
-    RankingValue(math.max(r1.score, r2.score), r1.increment + r2.increment)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/RateRankingValue.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/s2/counter/core/RateRankingValue.scala 
b/s2counter_core/src/main/scala/s2/counter/core/RateRankingValue.scala
deleted file mode 100644
index eca3abc..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/RateRankingValue.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package s2.counter.core
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 7. 2..
- */
-case class RateRankingValue(actionScore: Double, baseScore: Double) {
-  // increment score do not use.
-  lazy val rankingValue: RankingValue = {
-    RankingValue(actionScore / math.max(1d, baseScore), 0)
-  }
-}
-
-object RateRankingValue {
-  def reduce(r1: RateRankingValue, r2: RateRankingValue): RateRankingValue = {
-    // maximum score and sum of increment
-    RateRankingValue(math.max(r1.actionScore, r2.actionScore), 
math.max(r1.baseScore, r2.baseScore))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/TimedQualifier.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/TimedQualifier.scala 
b/s2counter_core/src/main/scala/s2/counter/core/TimedQualifier.scala
deleted file mode 100644
index b763bc2..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/TimedQualifier.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-package s2.counter.core
-
-import java.text.SimpleDateFormat
-import java.util.Calendar
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 6. 8..
- */
-case class TimedQualifier(q: TimedQualifier.IntervalUnit.Value, ts: Long) {
-  import TimedQualifier.IntervalUnit._
-
-  def dateTime: Long = {
-    val dateFormat = new SimpleDateFormat("yyyyMMddHHmm")
-    dateFormat.format(ts).toLong
-  }
-
-  def add(amount: Int): TimedQualifier = {
-    val cal = Calendar.getInstance()
-    cal.setTimeInMillis(ts)
-    q match {
-      case MINUTELY =>
-        cal.add(Calendar.MINUTE, amount)
-      case HOURLY =>
-        cal.add(Calendar.HOUR, amount)
-      case DAILY =>
-        cal.add(Calendar.DAY_OF_MONTH, amount)
-      case MONTHLY =>
-        cal.add(Calendar.MONTH, amount)
-      case TOTAL =>
-    }
-    copy(ts = cal.getTimeInMillis)
-  }
-}
-
-object TimedQualifier {
-  object IntervalUnit extends Enumeration {
-    type IntervalUnit = Value
-    val TOTAL = Value("t")
-    val MONTHLY = Value("M")
-    val DAILY = Value("d")
-    val HOURLY = Value("H")
-    val MINUTELY = Value("m")
-  }
-
-  def apply(q: String, ts: Long): TimedQualifier = 
TimedQualifier(IntervalUnit.withName(q), ts)
-
-  import IntervalUnit._
-
-  def getTsUnit(intervalUnit: IntervalUnit.IntervalUnit): Long = {
-    intervalUnit match {
-      case MINUTELY => 1 * 60 * 1000l
-      case HOURLY => 60 * 60 * 1000l
-      case DAILY => 24 * 60 * 60 * 1000l
-      case MONTHLY => 31 * 24 * 60 * 60 * 1000l
-      case v: IntervalUnit.IntervalUnit =>
-        throw new RuntimeException(s"unsupported operation for ${v.toString}")
-    }
-  }
-
-  def getQualifiers(intervals: Seq[IntervalUnit], millis: Long): 
Seq[TimedQualifier] = {
-    val cal = Calendar.getInstance()
-    cal.setTimeInMillis(millis)
-
-    val newCal = Calendar.getInstance()
-    newCal.set(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH), 1, 0, 0, 0)
-    newCal.set(Calendar.MILLISECOND, 0)
-    val month = newCal.getTimeInMillis
-    val Seq(day, hour, minute) = {
-      for {
-        field <- Seq(Calendar.DATE, Calendar.HOUR_OF_DAY, Calendar.MINUTE)
-      } yield {
-        newCal.set(field, cal.get(field))
-        newCal.getTimeInMillis
-      }
-    }
-
-    for {
-      interval <- intervals
-    } yield {
-      val ts = interval match {
-        case MINUTELY => minute
-        case HOURLY => hour
-        case DAILY => day
-        case MONTHLY => month
-        case TOTAL => 0L
-      }
-      TimedQualifier(interval, ts)
-    }
-  }
-
-  // descending order
-  def getQualifiersToLimit(intervals: Seq[IntervalUnit], limit: Int, tsOpt: 
Option[Long] = None): Seq[TimedQualifier] = {
-    val ts = tsOpt.getOrElse(System.currentTimeMillis())
-    for {
-      interval <- intervals
-      newLimit = if (interval == TOTAL) 1 else limit
-      i <- 0 until (-newLimit, -1)
-    } yield {
-      val newMillis = nextTime(interval, ts, i)
-      TimedQualifier(interval, newMillis)
-    }
-  }
-
-  private def nextTime(interval: IntervalUnit, ts: Long, i: Int): Long = {
-    val newCal = Calendar.getInstance()
-    newCal.setTimeInMillis(ts)
-    newCal.set(Calendar.MILLISECOND, 0)
-    interval match {
-      case MINUTELY =>
-        newCal.set(Calendar.SECOND, 0)
-        newCal.add(Calendar.MINUTE, i)
-        newCal.getTimeInMillis
-      case HOURLY =>
-        newCal.set(Calendar.SECOND, 0)
-        newCal.set(Calendar.MINUTE, 0)
-        newCal.add(Calendar.HOUR_OF_DAY, i)
-        newCal.getTimeInMillis
-      case DAILY =>
-        newCal.set(Calendar.SECOND, 0)
-        newCal.set(Calendar.MINUTE, 0)
-        newCal.set(Calendar.HOUR_OF_DAY, 0)
-        newCal.add(Calendar.DAY_OF_MONTH, i)
-        newCal.getTimeInMillis
-      case MONTHLY =>
-        newCal.set(Calendar.SECOND, 0)
-        newCal.set(Calendar.MINUTE, 0)
-        newCal.set(Calendar.HOUR_OF_DAY, 0)
-        newCal.set(Calendar.DAY_OF_MONTH, 1)
-        newCal.add(Calendar.MONTH, i)
-        newCal.getTimeInMillis
-      case TOTAL =>
-        0L
-    }
-  }
-
-  def getTimeList(interval: IntervalUnit, from: Long, to: Long, rst: 
List[Long] = Nil): List[Long] = {
-    interval match {
-      case TOTAL => List(0)
-      case _ =>
-        val next = nextTime(interval, from, 1)
-        if (next < from) {
-          // ignore
-          getTimeList(interval, next, to, rst)
-        }
-        else if (next < to) {
-          // recall
-          getTimeList(interval, next, to, rst :+ next)
-        } else {
-          // end condition
-          rst :+ next
-        }
-    }
-  }
-
-  // for reader
-  def getQualifiersToLimit(intervals: Seq[IntervalUnit],
-                           limit: Int,
-                           optFrom: Option[Long],
-                           optTo: Option[Long]): Seq[List[TimedQualifier]] = {
-    val newLimit = limit - 1
-    for {
-      interval <- intervals
-    } yield {
-      {
-        (optFrom, optTo) match {
-          case (Some(from), Some(to)) =>
-            getTimeList(interval, from, to)
-          case (Some(from), None) =>
-            getTimeList(interval, from, nextTime(interval, from, newLimit))
-          case (None, Some(to)) =>
-            getTimeList(interval, nextTime(interval, to, -newLimit), to)
-          case (None, None) =>
-            val current = System.currentTimeMillis()
-            getTimeList(interval, nextTime(interval, current, -newLimit), 
current)
-        }
-      }.map { ts =>
-        TimedQualifier(interval, ts)
-      }
-    }
-  }
-
-  def getTimeRange(intervals: Seq[IntervalUnit],
-                   limit: Int,
-                   optFrom: Option[Long],
-                   optTo: Option[Long]): Seq[(TimedQualifier, TimedQualifier)] 
= {
-    val newLimit = limit - 1
-    val maxInterval = intervals.maxBy {
-      case MINUTELY => 0
-      case HOURLY => 1
-      case DAILY => 2
-      case MONTHLY => 3
-      case TOTAL => 4
-    }
-    val minInterval = intervals.minBy {
-      case MINUTELY => 0
-      case HOURLY => 1
-      case DAILY => 2
-      case MONTHLY => 3
-      case TOTAL => 4
-    }
-    val (from, to) = (optFrom, optTo) match {
-      case (Some(f), Some(t)) =>
-        (f, t)
-      case (Some(f), None) =>
-        (f, nextTime(minInterval, f, newLimit))
-      case (None, Some(t)) =>
-        (nextTime(maxInterval, t, -newLimit), t)
-      case (None, None) =>
-        val current = System.currentTimeMillis()
-        (nextTime(maxInterval, current, -newLimit), nextTime(minInterval, 
current, 0))
-    }
-    for {
-      interval <- intervals
-    } yield {
-      (TimedQualifier(interval, from), TimedQualifier(interval, to))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/v1/BytesUtilV1.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/v1/BytesUtilV1.scala 
b/s2counter_core/src/main/scala/s2/counter/core/v1/BytesUtilV1.scala
deleted file mode 100644
index 1b70f63..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/v1/BytesUtilV1.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-package s2.counter.core.v1
-
-import org.apache.hadoop.hbase.util.Bytes
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core._
-import s2.models.Counter.ItemType
-import s2.util.Hashes
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 6. 11..
- */
-object BytesUtilV1 extends BytesUtil {
-  // ExactKey: [hash(2b)][policy(4b)][item(variable)]
-  val BUCKET_BYTE_SIZE = Bytes.SIZEOF_SHORT
-  val POLICY_ID_SIZE = Bytes.SIZEOF_INT
-  val INTERVAL_SIZE = Bytes.SIZEOF_BYTE
-  val TIMESTAMP_SIZE = Bytes.SIZEOF_LONG
-  val TIMED_QUALIFIER_SIZE = INTERVAL_SIZE + TIMESTAMP_SIZE
-
-  override def getRowKeyPrefix(id: Int): Array[Byte] = {
-    Bytes.toBytes(id)
-  }
-
-  override def toBytes(key: ExactKeyTrait): Array[Byte] = {
-    val buff = new ArrayBuffer[Byte]
-    // hash key (2 byte)
-    buff ++= Bytes.toBytes(Hashes.murmur3(key.itemKey)).take(BUCKET_BYTE_SIZE)
-
-    buff ++= getRowKeyPrefix(key.policyId)
-    buff ++= {
-      key.itemType match {
-        case ItemType.INT => Bytes.toBytes(key.itemKey.toInt)
-        case ItemType.LONG => Bytes.toBytes(key.itemKey.toLong)
-        case ItemType.STRING | ItemType.BLOB => Bytes.toBytes(key.itemKey)
-      }
-    }
-    buff.toArray
-  }
-
-  override def toBytes(eq: ExactQualifier): Array[Byte] = {
-    toBytes(eq.tq) ++ eq.dimension.getBytes
-  }
-
-  override def toBytes(tq: TimedQualifier): Array[Byte] = {
-    Bytes.toBytes(tq.q.toString) ++ Bytes.toBytes(tq.ts)
-  }
-
-  override def toExactQualifier(bytes: Array[Byte]): ExactQualifier = {
-    // qualifier: interval, ts, dimension 순서
-    val tq = toTimedQualifier(bytes)
-
-    val dimension = Bytes.toString(bytes, TIMED_QUALIFIER_SIZE, bytes.length - 
TIMED_QUALIFIER_SIZE)
-    ExactQualifier(tq, dimension)
-  }
-
-  override def toTimedQualifier(bytes: Array[Byte]): TimedQualifier = {
-    val interval = Bytes.toString(bytes, 0, INTERVAL_SIZE)
-    val ts = Bytes.toLong(bytes, INTERVAL_SIZE)
-
-    TimedQualifier(IntervalUnit.withName(interval), ts)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageAsyncHBase.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageAsyncHBase.scala 
b/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageAsyncHBase.scala
deleted file mode 100644
index 6aae3cd..0000000
--- 
a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageAsyncHBase.scala
+++ /dev/null
@@ -1,321 +0,0 @@
-package s2.counter.core.v1
-
-import java.util
-
-import com.stumbleupon.async.{Callback, Deferred}
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.CellUtil
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.util.Bytes
-import org.hbase.async.{ColumnRangeFilter, FilterList, GetRequest, KeyValue}
-import org.slf4j.LoggerFactory
-import s2.config.S2CounterConfig
-import s2.counter.core.ExactCounter.ExactValueMap
-import s2.counter.core._
-import s2.helper.{Management, WithAsyncHBase, WithHBase}
-import s2.models.Counter
-import s2.models.Counter.ItemType
-
-import scala.collection.JavaConversions._
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-/**
- * Created by hsleep(honeysl...@gmail.com) on 15. 8. 19..
- */
-class ExactStorageAsyncHBase(config: Config) extends ExactStorage {
-  import ExactStorageHBase._
-
-  private val log = LoggerFactory.getLogger(getClass)
-
-  lazy val s2config = new S2CounterConfig(config)
-
-  private[counter] val withHBase = new WithHBase(config)
-  private[counter] val withAsyncHBase = new WithAsyncHBase(config)
-  private[counter] val hbaseManagement = new Management(config)
-
-  private def getTableName(policy: Counter): String = {
-    policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME)
-  }
-  
-  override def get(policy: Counter,
-                   items: Seq[String],
-                   timeRange: Seq[(TimedQualifier, TimedQualifier)])
-                  (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
-
-    val tableName = getTableName(policy)
-
-    lazy val messageForLog = s"${policy.service}.${policy.action} $items 
$timeRange"
-
-    val keys = {
-      for {
-        item <- items
-      } yield {
-        ExactKey(policy, item, checkItemType = true)
-      }
-    }
-
-    val gets = {
-      for {
-        cf <- timeRange.map(t => intervalsMap(t._1.q)).distinct
-        key <- keys
-      } yield {
-        val get = new GetRequest(tableName, BytesUtilV1.toBytes(key))
-        get.family(cf.toString)
-        get.setFilter(new FilterList({
-          for {
-            (from, to) <- timeRange
-          } yield {
-            new ColumnRangeFilter(
-              BytesUtilV1.toBytes(from), true,
-              BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false)
-          }
-        }, FilterList.Operator.MUST_PASS_ONE))
-        (key, cf, get)
-      }
-    }
-
-//    println(s"$messageForLog $gets")
-
-    withAsyncHBase[Seq[FetchedCounts]] { client =>
-      val deferreds: Seq[Deferred[FetchedCounts]] = {
-        for {
-          (key, cf, get) <- gets
-        } yield {
-          client.get(get).addCallback { new Callback[FetchedCounts, 
util.ArrayList[KeyValue]] {
-            override def call(kvs: util.ArrayList[KeyValue]): FetchedCounts = {
-              val qualifierWithCounts = {
-                for {
-                  kv <- kvs
-                  eq = BytesUtilV1.toExactQualifier(kv.qualifier())
-                } yield {
-                  eq -> Bytes.toLong(kv.value())
-                }
-              }.toMap
-//              println(s"$key $qualifierWithCounts")
-              FetchedCounts(key, qualifierWithCounts)
-            }
-          }}
-        }
-      }
-      Deferred.group(deferreds).addCallback { new Callback[Seq[FetchedCounts], 
util.ArrayList[FetchedCounts]] {
-        override def call(arg: util.ArrayList[FetchedCounts]): 
Seq[FetchedCounts] = {
-          for {
-            (key, fetchedGroup) <- Seq(arg: _*).groupBy(_.exactKey)
-          } yield {
-            fetchedGroup.reduce[FetchedCounts] { case (f1, f2) =>
-              FetchedCounts(key, f1.qualifierWithCountMap ++ 
f2.qualifierWithCountMap)
-            }
-          }
-        }.toSeq
-      }}
-    }
-  }
-
-  override def get(policy: Counter,
-                   items: Seq[String],
-                   timeRange: Seq[(TimedQualifier, TimedQualifier)],
-                   dimQuery: Map[String, Set[String]])
-                  (implicit ec: ExecutionContext): 
Future[Seq[FetchedCountsGrouped]] = {
-    get(policy, items, timeRange).map { fetchedLs =>
-      for {
-        FetchedCounts(exactKey, qualifierWithCountMap) <- fetchedLs
-      } yield {
-        val intervalWithCountMap = qualifierWithCountMap
-          .filter { case (eq, v) => eq.checkDimensionEquality(dimQuery) }
-          .groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) }
-        FetchedCountsGrouped(exactKey, intervalWithCountMap)
-      }
-    }
-  }
-
-  override def update(policy: Counter, counts: Seq[(ExactKeyTrait, 
ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = {
-    // increment mutation to hbase
-    val increments = {
-      for {
-        (exactKey, values) <- counts
-        inc = new Increment(BytesUtilV1.toBytes(exactKey))
-      } yield {
-        for {
-          (eq, value) <- values
-        } {
-          inc.addColumn(intervalsMap.apply(eq.tq.q).toString.getBytes, 
BytesUtilV1.toBytes(eq), value)
-        }
-        // add column by dimension
-        inc
-      }
-    }
-
-    val results: Array[Object] = Array.fill(increments.size)(null)
-
-    withHBase(getTableName(policy)) { table =>
-      table.batch(increments, results)
-    } match {
-      case Failure(ex) =>
-        log.error(s"${ex.getMessage}")
-      case _ =>
-    }
-
-    assert(counts.length == results.length)
-
-    for {
-      ((exactKey, eqWithValue), result) <- counts.zip(results)
-    } yield {
-      val eqWithResult = result match {
-        case r: Result =>
-          for {
-            (eq, value) <- eqWithValue
-          } yield {
-            val interval = eq.tq.q
-            val cf = intervalsMap(interval)
-            val result = Option(r.getColumnLatestCell(cf.toString.getBytes, 
BytesUtilV1.toBytes(eq))).map { cell =>
-              Bytes.toLong(CellUtil.cloneValue(cell))
-            }.getOrElse(-1l)
-            eq -> result
-          }
-        case ex: Throwable =>
-          log.error(s"${ex.getMessage}: $exactKey")
-          Nil
-        case _ =>
-          log.error(s"result is null: $exactKey")
-          Nil
-      }
-      (exactKey, eqWithResult.toMap)
-    }
-  }.toMap
-
-  override def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = {
-    withHBase(getTableName(policy)) { table =>
-      table.delete {
-        for {
-          key <- keys
-        } yield {
-          new Delete(BytesUtilV1.toBytes(key))
-        }
-      }
-    } match {
-      case Failure(ex) =>
-        log.error(ex.getMessage)
-      case _ =>
-    }
-  }
-
-  override def get(policy: Counter,
-                   queries: Seq[(ExactKeyTrait, Seq[ExactQualifier])])
-                  (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
-
-    val tableName = getTableName(policy)
-
-    val gets = {
-      for {
-        (key, eqs) <- queries
-        (cf, eqsGrouped) <- eqs.groupBy(eq => intervalsMap(eq.tq.q))
-      } yield {
-//        println(s"$key $eqsGrouped")
-        val get = new GetRequest(tableName, BytesUtilV1.toBytes(key))
-        get.family(cf.toString)
-        get.qualifiers(eqsGrouped.map(BytesUtilV1.toBytes).toArray)
-        (key, cf, get)
-      }
-    }
-
-    withAsyncHBase[Seq[FetchedCounts]] { client =>
-      val deferreds: Seq[Deferred[FetchedCounts]] = {
-        for {
-          (key, cf, get) <- gets
-        } yield {
-          client.get(get).addCallback { new Callback[FetchedCounts, 
util.ArrayList[KeyValue]] {
-            override def call(kvs: util.ArrayList[KeyValue]): FetchedCounts = {
-              val qualifierWithCounts = {
-                for {
-                  kv <- kvs
-                  eq = BytesUtilV1.toExactQualifier(kv.qualifier())
-                } yield {
-                  eq -> Bytes.toLong(kv.value())
-                }
-              }.toMap
-              FetchedCounts(key, qualifierWithCounts)
-            }
-          }}
-        }
-      }
-      Deferred.group(deferreds).addCallback { new Callback[Seq[FetchedCounts], 
util.ArrayList[FetchedCounts]] {
-        override def call(arg: util.ArrayList[FetchedCounts]): 
Seq[FetchedCounts] = arg
-      }}
-    }
-  }
-
-  override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): 
Seq[Boolean] = {
-    val results: Array[Object] = Array.fill(keys.size)(null)
-
-    val puts = keys.map { key =>
-      val put = new Put(BytesUtilV1.toBytes(key))
-      put.addColumn(blobCF, blobColumn, key.itemId.getBytes)
-    }
-
-    withHBase(getTableName(policy)) { table =>
-      table.batch(puts, results)
-    } match {
-      case Failure(ex) =>
-        log.error(s"${ex.getMessage}")
-      case _ =>
-    }
-
-    for {
-      (result, key) <- results.zip(keys)
-    } yield {
-      Option(result).map(_ => true).getOrElse {
-        log.error(s"fail to insert blob value: $key")
-        false
-      }
-    }
-  }
-
-  override def getBlobValue(policy: Counter, blobId: String): Option[String] = 
{
-    lazy val messageForLog = s"${policy.service}.${policy.action}.$blobId"
-
-    policy.itemType match {
-      case ItemType.BLOB =>
-        withHBase(getTableName(policy)) { table =>
-          val rowKey = BytesUtilV1.toBytes(ExactKey(policy.id, policy.version, 
policy.itemType, blobId))
-          val get = new Get(rowKey)
-          get.addColumn(blobCF, blobColumn)
-          table.get(get)
-        } match {
-          case Success(result) =>
-            Option(result).filter(!_.isEmpty).map { rst =>
-              Bytes.toString(rst.getValue(blobCF, blobColumn))
-            }
-          case Failure(ex) =>
-            throw ex
-        }
-      case _ =>
-        log.warn(s"is not blob type counter. $messageForLog")
-        throw new Exception(s"is not blob type counter. $messageForLog")
-    }
-  }
-
-  override def prepare(policy: Counter): Unit = {
-    // create hbase table
-    policy.hbaseTable.foreach { table =>
-      if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, 
table)) {
-        hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table,
-          ColumnFamily.values.map(_.toString).toList, 1)
-        hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, 
ColumnFamily.SHORT.toString, policy.ttl)
-        policy.dailyTtl.foreach { i =>
-          hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, 
ColumnFamily.LONG.toString, i * 24 * 60 * 60)
-        }
-      }
-    }
-  }
-
-  override def destroy(policy: Counter): Unit = {
-
-  }
-
-  override def ready(policy: Counter): Boolean = {
-    policy.hbaseTable.map { table =>
-      hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)
-    }.getOrElse(true)
-  }
-}

Reply via email to