http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/TimedQualifier.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/TimedQualifier.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/TimedQualifier.scala
new file mode 100644
index 0000000..ad2bea2
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/TimedQualifier.scala
@@ -0,0 +1,215 @@
+package org.apache.s2graph.counter.core
+
+import java.text.SimpleDateFormat
+import java.util.Calendar
+
+case class TimedQualifier(q: TimedQualifier.IntervalUnit.Value, ts: Long) {
+  import TimedQualifier.IntervalUnit._
+
+  def dateTime: Long = {
+    val dateFormat = new SimpleDateFormat("yyyyMMddHHmm")
+    dateFormat.format(ts).toLong
+  }
+
+  def add(amount: Int): TimedQualifier = {
+    val cal = Calendar.getInstance()
+    cal.setTimeInMillis(ts)
+    q match {
+      case MINUTELY =>
+        cal.add(Calendar.MINUTE, amount)
+      case HOURLY =>
+        cal.add(Calendar.HOUR, amount)
+      case DAILY =>
+        cal.add(Calendar.DAY_OF_MONTH, amount)
+      case MONTHLY =>
+        cal.add(Calendar.MONTH, amount)
+      case TOTAL =>
+    }
+    copy(ts = cal.getTimeInMillis)
+  }
+}
+
+object TimedQualifier {
+  object IntervalUnit extends Enumeration {
+    type IntervalUnit = Value
+    val TOTAL = Value("t")
+    val MONTHLY = Value("M")
+    val DAILY = Value("d")
+    val HOURLY = Value("H")
+    val MINUTELY = Value("m")
+  }
+
+  def apply(q: String, ts: Long): TimedQualifier = 
TimedQualifier(IntervalUnit.withName(q), ts)
+
+  import IntervalUnit._
+
+  def getTsUnit(intervalUnit: IntervalUnit.IntervalUnit): Long = {
+    intervalUnit match {
+      case MINUTELY => 1 * 60 * 1000l
+      case HOURLY => 60 * 60 * 1000l
+      case DAILY => 24 * 60 * 60 * 1000l
+      case MONTHLY => 31 * 24 * 60 * 60 * 1000l
+      case v: IntervalUnit.IntervalUnit =>
+        throw new RuntimeException(s"unsupported operation for ${v.toString}")
+    }
+  }
+
+  def getQualifiers(intervals: Seq[IntervalUnit], millis: Long): 
Seq[TimedQualifier] = {
+    val cal = Calendar.getInstance()
+    cal.setTimeInMillis(millis)
+
+    val newCal = Calendar.getInstance()
+    newCal.set(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH), 1, 0, 0, 0)
+    newCal.set(Calendar.MILLISECOND, 0)
+    val month = newCal.getTimeInMillis
+    val Seq(day, hour, minute) = {
+      for {
+        field <- Seq(Calendar.DATE, Calendar.HOUR_OF_DAY, Calendar.MINUTE)
+      } yield {
+        newCal.set(field, cal.get(field))
+        newCal.getTimeInMillis
+      }
+    }
+
+    for {
+      interval <- intervals
+    } yield {
+      val ts = interval match {
+        case MINUTELY => minute
+        case HOURLY => hour
+        case DAILY => day
+        case MONTHLY => month
+        case TOTAL => 0L
+      }
+      TimedQualifier(interval, ts)
+    }
+  }
+
+  // descending order
+  def getQualifiersToLimit(intervals: Seq[IntervalUnit], limit: Int, tsOpt: 
Option[Long] = None): Seq[TimedQualifier] = {
+    val ts = tsOpt.getOrElse(System.currentTimeMillis())
+    for {
+      interval <- intervals
+      newLimit = if (interval == TOTAL) 1 else limit
+      i <- 0 until (-newLimit, -1)
+    } yield {
+      val newMillis = nextTime(interval, ts, i)
+      TimedQualifier(interval, newMillis)
+    }
+  }
+
+  private def nextTime(interval: IntervalUnit, ts: Long, i: Int): Long = {
+    val newCal = Calendar.getInstance()
+    newCal.setTimeInMillis(ts)
+    newCal.set(Calendar.MILLISECOND, 0)
+    interval match {
+      case MINUTELY =>
+        newCal.set(Calendar.SECOND, 0)
+        newCal.add(Calendar.MINUTE, i)
+        newCal.getTimeInMillis
+      case HOURLY =>
+        newCal.set(Calendar.SECOND, 0)
+        newCal.set(Calendar.MINUTE, 0)
+        newCal.add(Calendar.HOUR_OF_DAY, i)
+        newCal.getTimeInMillis
+      case DAILY =>
+        newCal.set(Calendar.SECOND, 0)
+        newCal.set(Calendar.MINUTE, 0)
+        newCal.set(Calendar.HOUR_OF_DAY, 0)
+        newCal.add(Calendar.DAY_OF_MONTH, i)
+        newCal.getTimeInMillis
+      case MONTHLY =>
+        newCal.set(Calendar.SECOND, 0)
+        newCal.set(Calendar.MINUTE, 0)
+        newCal.set(Calendar.HOUR_OF_DAY, 0)
+        newCal.set(Calendar.DAY_OF_MONTH, 1)
+        newCal.add(Calendar.MONTH, i)
+        newCal.getTimeInMillis
+      case TOTAL =>
+        0L
+    }
+  }
+
+  def getTimeList(interval: IntervalUnit, from: Long, to: Long, rst: 
List[Long] = Nil): List[Long] = {
+    interval match {
+      case TOTAL => List(0)
+      case _ =>
+        val next = nextTime(interval, from, 1)
+        if (next < from) {
+          // ignore
+          getTimeList(interval, next, to, rst)
+        }
+        else if (next < to) {
+          // recall
+          getTimeList(interval, next, to, rst :+ next)
+        } else {
+          // end condition
+          rst :+ next
+        }
+    }
+  }
+
+  // for reader
+  def getQualifiersToLimit(intervals: Seq[IntervalUnit],
+                           limit: Int,
+                           optFrom: Option[Long],
+                           optTo: Option[Long]): Seq[List[TimedQualifier]] = {
+    val newLimit = limit - 1
+    for {
+      interval <- intervals
+    } yield {
+      {
+        (optFrom, optTo) match {
+          case (Some(from), Some(to)) =>
+            getTimeList(interval, from, to)
+          case (Some(from), None) =>
+            getTimeList(interval, from, nextTime(interval, from, newLimit))
+          case (None, Some(to)) =>
+            getTimeList(interval, nextTime(interval, to, -newLimit), to)
+          case (None, None) =>
+            val current = System.currentTimeMillis()
+            getTimeList(interval, nextTime(interval, current, -newLimit), 
current)
+        }
+      }.map { ts =>
+        TimedQualifier(interval, ts)
+      }
+    }
+  }
+
+  def getTimeRange(intervals: Seq[IntervalUnit],
+                   limit: Int,
+                   optFrom: Option[Long],
+                   optTo: Option[Long]): Seq[(TimedQualifier, TimedQualifier)] 
= {
+    val newLimit = limit - 1
+    val maxInterval = intervals.maxBy {
+      case MINUTELY => 0
+      case HOURLY => 1
+      case DAILY => 2
+      case MONTHLY => 3
+      case TOTAL => 4
+    }
+    val minInterval = intervals.minBy {
+      case MINUTELY => 0
+      case HOURLY => 1
+      case DAILY => 2
+      case MONTHLY => 3
+      case TOTAL => 4
+    }
+    val (from, to) = (optFrom, optTo) match {
+      case (Some(f), Some(t)) =>
+        (f, t)
+      case (Some(f), None) =>
+        (f, nextTime(minInterval, f, newLimit))
+      case (None, Some(t)) =>
+        (nextTime(maxInterval, t, -newLimit), t)
+      case (None, None) =>
+        val current = System.currentTimeMillis()
+        (nextTime(maxInterval, current, -newLimit), nextTime(minInterval, 
current, 0))
+    }
+    for {
+      interval <- intervals
+    } yield {
+      (TimedQualifier(interval, from), TimedQualifier(interval, to))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/BytesUtilV1.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/BytesUtilV1.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/BytesUtilV1.scala
new file mode 100644
index 0000000..eaef60d
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/BytesUtilV1.scala
@@ -0,0 +1,60 @@
+package org.apache.s2graph.counter.core.v1
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
+import org.apache.s2graph.counter.core.{TimedQualifier, ExactQualifier, 
ExactKeyTrait, BytesUtil}
+import org.apache.s2graph.counter.models.Counter.ItemType
+import org.apache.s2graph.counter.util.Hashes
+import scala.collection.mutable.ArrayBuffer
+
+object BytesUtilV1 extends BytesUtil {
+  // ExactKey: [hash(2b)][policy(4b)][item(variable)]
+  val BUCKET_BYTE_SIZE = Bytes.SIZEOF_SHORT
+  val POLICY_ID_SIZE = Bytes.SIZEOF_INT
+  val INTERVAL_SIZE = Bytes.SIZEOF_BYTE
+  val TIMESTAMP_SIZE = Bytes.SIZEOF_LONG
+  val TIMED_QUALIFIER_SIZE = INTERVAL_SIZE + TIMESTAMP_SIZE
+
+  override def getRowKeyPrefix(id: Int): Array[Byte] = {
+    Bytes.toBytes(id)
+  }
+
+  override def toBytes(key: ExactKeyTrait): Array[Byte] = {
+    val buff = new ArrayBuffer[Byte]
+    // hash key (2 byte)
+    buff ++= Bytes.toBytes(Hashes.murmur3(key.itemKey)).take(BUCKET_BYTE_SIZE)
+
+    buff ++= getRowKeyPrefix(key.policyId)
+    buff ++= {
+      key.itemType match {
+        case ItemType.INT => Bytes.toBytes(key.itemKey.toInt)
+        case ItemType.LONG => Bytes.toBytes(key.itemKey.toLong)
+        case ItemType.STRING | ItemType.BLOB => Bytes.toBytes(key.itemKey)
+      }
+    }
+    buff.toArray
+  }
+
+  override def toBytes(eq: ExactQualifier): Array[Byte] = {
+    toBytes(eq.tq) ++ eq.dimension.getBytes
+  }
+
+  override def toBytes(tq: TimedQualifier): Array[Byte] = {
+    Bytes.toBytes(tq.q.toString) ++ Bytes.toBytes(tq.ts)
+  }
+
+  override def toExactQualifier(bytes: Array[Byte]): ExactQualifier = {
+    // qualifier: interval, ts, dimension 순서
+    val tq = toTimedQualifier(bytes)
+
+    val dimension = Bytes.toString(bytes, TIMED_QUALIFIER_SIZE, bytes.length - 
TIMED_QUALIFIER_SIZE)
+    ExactQualifier(tq, dimension)
+  }
+
+  override def toTimedQualifier(bytes: Array[Byte]): TimedQualifier = {
+    val interval = Bytes.toString(bytes, 0, INTERVAL_SIZE)
+    val ts = Bytes.toLong(bytes, INTERVAL_SIZE)
+
+    TimedQualifier(IntervalUnit.withName(interval), ts)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala
new file mode 100644
index 0000000..340986f
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala
@@ -0,0 +1,317 @@
+package org.apache.s2graph.counter.core.v1
+
+import java.util
+import com.stumbleupon.async.{Callback, Deferred}
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.CellUtil
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.core
+import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap
+import org.apache.s2graph.counter.core._
+import org.apache.s2graph.counter.helper.{Management, WithAsyncHBase, 
WithHBase}
+import org.apache.s2graph.counter.models.Counter
+import org.apache.s2graph.counter.models.Counter.ItemType
+import org.hbase.async.{KeyValue, ColumnRangeFilter, FilterList, GetRequest}
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+class ExactStorageAsyncHBase(config: Config) extends ExactStorage {
+  import ExactStorageHBase._
+
+  private val log = LoggerFactory.getLogger(getClass)
+
+  lazy val s2config = new S2CounterConfig(config)
+
+  private[counter] val withHBase = new WithHBase(config)
+  private[counter] val withAsyncHBase = new WithAsyncHBase(config)
+  private[counter] val hbaseManagement = new Management(config)
+
+  private def getTableName(policy: Counter): String = {
+    policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME)
+  }
+  
+  override def get(policy: Counter,
+                   items: Seq[String],
+                   timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)])
+                  (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
+
+    val tableName = getTableName(policy)
+
+    lazy val messageForLog = s"${policy.service}.${policy.action} $items 
$timeRange"
+
+    val keys = {
+      for {
+        item <- items
+      } yield {
+        ExactKey(policy, item, checkItemType = true)
+      }
+    }
+
+    val gets = {
+      for {
+        cf <- timeRange.map(t => intervalsMap(t._1.q)).distinct
+        key <- keys
+      } yield {
+        val get = new GetRequest(tableName, BytesUtilV1.toBytes(key))
+        get.family(cf.toString)
+        get.setFilter(new FilterList({
+          for {
+            (from, to) <- timeRange
+          } yield {
+            new ColumnRangeFilter(
+              BytesUtilV1.toBytes(from), true,
+              BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false)
+          }
+        }, FilterList.Operator.MUST_PASS_ONE))
+        (key, cf, get)
+      }
+    }
+
+//    println(s"$messageForLog $gets")
+
+    withAsyncHBase[Seq[FetchedCounts]] { client =>
+      val deferreds: Seq[Deferred[FetchedCounts]] = {
+        for {
+          (key, cf, get) <- gets
+        } yield {
+          client.get(get).addCallback { new Callback[FetchedCounts, 
util.ArrayList[KeyValue]] {
+            override def call(kvs: util.ArrayList[KeyValue]): FetchedCounts = {
+              val qualifierWithCounts = {
+                for {
+                  kv <- kvs
+                  eq = BytesUtilV1.toExactQualifier(kv.qualifier())
+                } yield {
+                  eq -> Bytes.toLong(kv.value())
+                }
+              }.toMap
+//              println(s"$key $qualifierWithCounts")
+              FetchedCounts(key, qualifierWithCounts)
+            }
+          }}
+        }
+      }
+      Deferred.group(deferreds).addCallback { new Callback[Seq[FetchedCounts], 
util.ArrayList[FetchedCounts]] {
+        override def call(arg: util.ArrayList[FetchedCounts]): 
Seq[FetchedCounts] = {
+          for {
+            (key, fetchedGroup) <- Seq(arg: _*).groupBy(_.exactKey)
+          } yield {
+            fetchedGroup.reduce[FetchedCounts] { case (f1, f2) =>
+              FetchedCounts(key, f1.qualifierWithCountMap ++ 
f2.qualifierWithCountMap)
+            }
+          }
+        }.toSeq
+      }}
+    }
+  }
+
+  override def get(policy: Counter,
+                   items: Seq[String],
+                   timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)],
+                   dimQuery: Map[String, Set[String]])
+                  (implicit ec: ExecutionContext): 
Future[Seq[FetchedCountsGrouped]] = {
+    get(policy, items, timeRange).map { fetchedLs =>
+      for {
+        FetchedCounts(exactKey, qualifierWithCountMap) <- fetchedLs
+      } yield {
+        val intervalWithCountMap = qualifierWithCountMap
+          .filter { case (eq, v) => eq.checkDimensionEquality(dimQuery) }
+          .groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) }
+        FetchedCountsGrouped(exactKey, intervalWithCountMap)
+      }
+    }
+  }
+
+  override def update(policy: Counter, counts: Seq[(ExactKeyTrait, 
ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = {
+    // increment mutation to hbase
+    val increments = {
+      for {
+        (exactKey, values) <- counts
+        inc = new Increment(BytesUtilV1.toBytes(exactKey))
+      } yield {
+        for {
+          (eq, value) <- values
+        } {
+          inc.addColumn(intervalsMap.apply(eq.tq.q).toString.getBytes, 
BytesUtilV1.toBytes(eq), value)
+        }
+        // add column by dimension
+        inc
+      }
+    }
+
+    val results: Array[Object] = Array.fill(increments.size)(null)
+
+    withHBase(getTableName(policy)) { table =>
+      table.batch(increments, results)
+    } match {
+      case Failure(ex) =>
+        log.error(s"${ex.getMessage}")
+      case _ =>
+    }
+
+    assert(counts.length == results.length)
+
+    for {
+      ((exactKey, eqWithValue), result) <- counts.zip(results)
+    } yield {
+      val eqWithResult = result match {
+        case r: Result =>
+          for {
+            (eq, value) <- eqWithValue
+          } yield {
+            val interval = eq.tq.q
+            val cf = intervalsMap(interval)
+            val result = Option(r.getColumnLatestCell(cf.toString.getBytes, 
BytesUtilV1.toBytes(eq))).map { cell =>
+              Bytes.toLong(CellUtil.cloneValue(cell))
+            }.getOrElse(-1l)
+            eq -> result
+          }
+        case ex: Throwable =>
+          log.error(s"${ex.getMessage}: $exactKey")
+          Nil
+        case _ =>
+          log.error(s"result is null: $exactKey")
+          Nil
+      }
+      (exactKey, eqWithResult.toMap)
+    }
+  }.toMap
+
+  override def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = {
+    withHBase(getTableName(policy)) { table =>
+      table.delete {
+        for {
+          key <- keys
+        } yield {
+          new Delete(BytesUtilV1.toBytes(key))
+        }
+      }
+    } match {
+      case Failure(ex) =>
+        log.error(ex.getMessage)
+      case _ =>
+    }
+  }
+
+  override def get(policy: Counter,
+                   queries: Seq[(ExactKeyTrait, Seq[core.ExactQualifier])])
+                  (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
+
+    val tableName = getTableName(policy)
+
+    val gets = {
+      for {
+        (key, eqs) <- queries
+        (cf, eqsGrouped) <- eqs.groupBy(eq => intervalsMap(eq.tq.q))
+      } yield {
+//        println(s"$key $eqsGrouped")
+        val get = new GetRequest(tableName, BytesUtilV1.toBytes(key))
+        get.family(cf.toString)
+        get.qualifiers(eqsGrouped.map(BytesUtilV1.toBytes).toArray)
+        (key, cf, get)
+      }
+    }
+
+    withAsyncHBase[Seq[FetchedCounts]] { client =>
+      val deferreds: Seq[Deferred[FetchedCounts]] = {
+        for {
+          (key, cf, get) <- gets
+        } yield {
+          client.get(get).addCallback { new Callback[FetchedCounts, 
util.ArrayList[KeyValue]] {
+            override def call(kvs: util.ArrayList[KeyValue]): FetchedCounts = {
+              val qualifierWithCounts = {
+                for {
+                  kv <- kvs
+                  eq = BytesUtilV1.toExactQualifier(kv.qualifier())
+                } yield {
+                  eq -> Bytes.toLong(kv.value())
+                }
+              }.toMap
+              FetchedCounts(key, qualifierWithCounts)
+            }
+          }}
+        }
+      }
+      Deferred.group(deferreds).addCallback { new Callback[Seq[FetchedCounts], 
util.ArrayList[FetchedCounts]] {
+        override def call(arg: util.ArrayList[FetchedCounts]): 
Seq[FetchedCounts] = arg
+      }}
+    }
+  }
+
+  override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): 
Seq[Boolean] = {
+    val results: Array[Object] = Array.fill(keys.size)(null)
+
+    val puts = keys.map { key =>
+      val put = new Put(BytesUtilV1.toBytes(key))
+      put.addColumn(blobCF, blobColumn, key.itemId.getBytes)
+    }
+
+    withHBase(getTableName(policy)) { table =>
+      table.batch(puts, results)
+    } match {
+      case Failure(ex) =>
+        log.error(s"${ex.getMessage}")
+      case _ =>
+    }
+
+    for {
+      (result, key) <- results.zip(keys)
+    } yield {
+      Option(result).map(_ => true).getOrElse {
+        log.error(s"fail to insert blob value: $key")
+        false
+      }
+    }
+  }
+
+  override def getBlobValue(policy: Counter, blobId: String): Option[String] = 
{
+    lazy val messageForLog = s"${policy.service}.${policy.action}.$blobId"
+
+    policy.itemType match {
+      case ItemType.BLOB =>
+        withHBase(getTableName(policy)) { table =>
+          val rowKey = BytesUtilV1.toBytes(ExactKey(policy.id, policy.version, 
policy.itemType, blobId))
+          val get = new Get(rowKey)
+          get.addColumn(blobCF, blobColumn)
+          table.get(get)
+        } match {
+          case Success(result) =>
+            Option(result).filter(!_.isEmpty).map { rst =>
+              Bytes.toString(rst.getValue(blobCF, blobColumn))
+            }
+          case Failure(ex) =>
+            throw ex
+        }
+      case _ =>
+        log.warn(s"is not blob type counter. $messageForLog")
+        throw new Exception(s"is not blob type counter. $messageForLog")
+    }
+  }
+
+  override def prepare(policy: Counter): Unit = {
+    // create hbase table
+    policy.hbaseTable.foreach { table =>
+      if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, 
table)) {
+        hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table,
+          ColumnFamily.values.map(_.toString).toList, 1)
+        hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, 
ColumnFamily.SHORT.toString, policy.ttl)
+        policy.dailyTtl.foreach { i =>
+          hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, 
ColumnFamily.LONG.toString, i * 24 * 60 * 60)
+        }
+      }
+    }
+  }
+
+  override def destroy(policy: Counter): Unit = {
+
+  }
+
+  override def ready(policy: Counter): Boolean = {
+    policy.hbaseTable.map { table =>
+      hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)
+    }.getOrElse(true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageHBase.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageHBase.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageHBase.scala
new file mode 100644
index 0000000..d6d87d3
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageHBase.scala
@@ -0,0 +1,326 @@
+package org.apache.s2graph.counter.core.v1
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.CellUtil
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.filter.{ColumnRangeFilter, FilterList}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.core
+import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap
+import org.apache.s2graph.counter.core._
+import org.apache.s2graph.counter.helper.{Management, WithHBase}
+import org.apache.s2graph.counter.models.Counter
+import org.apache.s2graph.counter.models.Counter.ItemType
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+
+class ExactStorageHBase(config: Config) extends ExactStorage {
+  import ExactStorageHBase._
+
+  private val log = LoggerFactory.getLogger(getClass)
+
+  lazy val s2config = new S2CounterConfig(config)
+
+  private[counter] val withHBase = new WithHBase(config)
+  private[counter] val hbaseManagement = new Management(config)
+
+
+
+  private def getTableName(policy: Counter): String = {
+    policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME)
+  }
+
+  override def get(policy: Counter,
+                   items: Seq[String],
+                   timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)])
+                  (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
+    lazy val messageForLog = s"${policy.service}.${policy.action} $items 
$timeRange"
+
+    val keys = {
+      for {
+        item <- items
+      } yield {
+        ExactKey(policy, item, checkItemType = true)
+      }
+    }
+
+    val gets = {
+      for {
+        key <- keys
+      } yield {
+        val get = new Get(BytesUtilV1.toBytes(key))
+        timeRange.map(t => intervalsMap(t._1.q)).distinct.foreach { cf =>
+          get.addFamily(cf.toString.getBytes)
+        }
+        get.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE, {
+          for {
+            (from, to) <- timeRange
+          } yield {
+            new ColumnRangeFilter(
+              BytesUtilV1.toBytes(from), true,
+              BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false)
+          }
+        }))
+      }
+    }
+
+    //    println(s"$messageForLog $gets")
+    Future {
+      withHBase(getTableName(policy)) { table =>
+        for {
+          (rst, key) <- table.get(gets).zip(keys) if !rst.isEmpty
+        } yield {
+          val qualifierWithCounts = {
+            for {
+              cell <- rst.listCells()
+              eq = BytesUtilV1.toExactQualifier(CellUtil.cloneQualifier(cell))
+            } yield {
+              eq -> Bytes.toLong(CellUtil.cloneValue(cell))
+            }
+          }.toMap
+          FetchedCounts(key, qualifierWithCounts)
+        }
+      } match {
+        case Success(rst) => rst
+        case Failure(ex) =>
+          log.error(s"$ex: $messageForLog")
+          Nil
+      }
+    }
+  }
+
+  override def get(policy: Counter,
+                   items: Seq[String],
+                   timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)],
+                   dimQuery: Map[String, Set[String]])
+                  (implicit ec: ExecutionContext): 
Future[Seq[FetchedCountsGrouped]] = {
+    get(policy, items, timeRange).map { fetchedLs =>
+      for {
+        FetchedCounts(exactKey, qualifierWithCountMap) <- fetchedLs
+      } yield {
+        val intervalWithCountMap = qualifierWithCountMap
+          .filter { case (eq, v) => eq.checkDimensionEquality(dimQuery) }
+          .groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) }
+        FetchedCountsGrouped(exactKey, intervalWithCountMap)
+      }
+    }
+  }
+
+  override def update(policy: Counter, counts: Seq[(ExactKeyTrait, 
ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = {
+    // increment mutation to hbase
+    val increments = {
+      for {
+        (exactKey, values) <- counts
+        inc = new Increment(BytesUtilV1.toBytes(exactKey))
+      } yield {
+        for {
+          (eq, value) <- values
+        } {
+          inc.addColumn(intervalsMap.apply(eq.tq.q).toString.getBytes, 
BytesUtilV1.toBytes(eq), value)
+        }
+        // add column by dimension
+        inc
+      }
+    }
+
+    val results: Array[Object] = Array.fill(increments.size)(null)
+
+    withHBase(getTableName(policy)) { table =>
+      table.batch(increments, results)
+    } match {
+      case Failure(ex) =>
+        log.error(s"${ex.getMessage}")
+      case _ =>
+    }
+
+    assert(counts.length == results.length)
+
+    for {
+      ((exactKey, eqWithValue), result) <- counts.zip(results)
+    } yield {
+      val eqWithResult = result match {
+        case r: Result =>
+          for {
+            (eq, value) <- eqWithValue
+          } yield {
+            val interval = eq.tq.q
+            val cf = intervalsMap(interval)
+            val result = Option(r.getColumnLatestCell(cf.toString.getBytes, 
BytesUtilV1.toBytes(eq))).map { cell =>
+              Bytes.toLong(CellUtil.cloneValue(cell))
+            }.getOrElse(-1l)
+            eq -> result
+          }
+        case ex: Throwable =>
+          log.error(s"${ex.getMessage}: $exactKey")
+          Nil
+        case _ =>
+          log.error(s"result is null: $exactKey")
+          Nil
+      }
+      (exactKey, eqWithResult.toMap)
+    }
+  }.toMap
+
+  override def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = {
+    withHBase(getTableName(policy)) { table =>
+      table.delete {
+        for {
+          key <- keys
+        } yield {
+          new Delete(BytesUtilV1.toBytes(key))
+        }
+      }
+    } match {
+      case Failure(ex) =>
+        log.error(ex.getMessage)
+      case _ =>
+    }
+  }
+
+  override def get(policy: Counter,
+                   queries: Seq[(ExactKeyTrait, Seq[core.ExactQualifier])])
+                  (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
+    lazy val messageForLog = s"${policy.service}.${policy.action} $queries"
+
+    val gets = {
+      for {
+        (key, eqs) <- queries
+      } yield {
+        //        println(s"$key $eqsGrouped")
+        val get = new Get(BytesUtilV1.toBytes(key))
+
+        for {
+          eq <- eqs
+        } {
+          val cf = intervalsMap(eq.tq.q)
+          get.addColumn(cf.toString.getBytes, BytesUtilV1.toBytes(eq))
+        }
+        get
+      }
+    }
+
+    Future {
+      withHBase(getTableName(policy)) { table =>
+        for {
+          (rst, key) <- table.get(gets).zip(queries.map(_._1)) if !rst.isEmpty
+        } yield {
+          val qualifierWithCounts = {
+            for {
+              cell <- rst.listCells()
+              eq = BytesUtilV1.toExactQualifier(CellUtil.cloneQualifier(cell))
+            } yield {
+              eq -> Bytes.toLong(CellUtil.cloneValue(cell))
+            }
+          }.toMap
+          FetchedCounts(key, qualifierWithCounts)
+        }
+      } match {
+        case Success(rst) => rst.toSeq
+        case Failure(ex) =>
+          log.error(s"$ex: $messageForLog")
+          Nil
+      }
+    }
+  }
+
+  override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): 
Seq[Boolean] = {
+    val results: Array[Object] = Array.fill(keys.size)(null)
+
+    val puts = keys.map { key =>
+      val put = new Put(BytesUtilV1.toBytes(key))
+      put.addColumn(blobCF, blobColumn, key.itemId.getBytes)
+    }
+
+    withHBase(getTableName(policy)) { table =>
+      table.batch(puts, results)
+    } match {
+      case Failure(ex) =>
+        log.error(s"${ex.getMessage}")
+      case _ =>
+    }
+
+    for {
+      (result, key) <- results.zip(keys)
+    } yield {
+      Option(result).map(_ => true).getOrElse {
+        log.error(s"fail to insert blob value: $key")
+        false
+      }
+    }
+  }
+
+  override def getBlobValue(policy: Counter, blobId: String): Option[String] = 
{
+    lazy val messageForLog = s"${policy.service}.${policy.action}.$blobId"
+
+    policy.itemType match {
+      case ItemType.BLOB =>
+        withHBase(getTableName(policy)) { table =>
+          val rowKey = BytesUtilV1.toBytes(ExactKey(policy.id, policy.version, 
policy.itemType, blobId))
+          val get = new Get(rowKey)
+          get.addColumn(blobCF, blobColumn)
+          table.get(get)
+        } match {
+          case Success(result) =>
+            Option(result).filter(!_.isEmpty).map { rst =>
+              Bytes.toString(rst.getValue(blobCF, blobColumn))
+            }
+          case Failure(ex) =>
+            throw ex
+        }
+      case _ =>
+        log.warn(s"is not blob type counter. $messageForLog")
+        throw new Exception(s"is not blob type counter. $messageForLog")
+    }
+  }
+
+  override def prepare(policy: Counter): Unit = {
+    // create hbase table
+    policy.hbaseTable.foreach { table =>
+
+      if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, 
table)) {
+        hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table,
+          ColumnFamily.values.map(_.toString).toList, 1)
+        hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, 
ColumnFamily.SHORT.toString, policy.ttl)
+        policy.dailyTtl.foreach { i =>
+          hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, 
ColumnFamily.LONG.toString, i * 24 * 60 * 60)
+        }
+      }
+    }
+  }
+
+  override def destroy(policy: Counter): Unit = {
+
+  }
+
+  override def ready(policy: Counter): Boolean = {
+    policy.hbaseTable.map { table =>
+      hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)
+    }.getOrElse(true)
+  }
+}
+
+object ExactStorageHBase {
+  import core.TimedQualifier.IntervalUnit._
+
+  object ColumnFamily extends Enumeration {
+    type ColumnFamily = Value
+
+    val SHORT = Value("s")
+    val LONG = Value("l")
+  }
+
+  val blobCF = ColumnFamily.LONG.toString.getBytes
+  val blobColumn = "b".getBytes
+
+  val intervalsMap = Map(
+    MINUTELY -> ColumnFamily.SHORT,
+    HOURLY -> ColumnFamily.SHORT,
+    DAILY -> ColumnFamily.LONG,
+    MONTHLY -> ColumnFamily.LONG,
+    TOTAL -> ColumnFamily.LONG
+  )
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/RankingStorageRedis.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/RankingStorageRedis.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/RankingStorageRedis.scala
new file mode 100644
index 0000000..15ff380
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/RankingStorageRedis.scala
@@ -0,0 +1,185 @@
+package org.apache.s2graph.counter.core.v1
+
+import java.lang
+
+import com.typesafe.config.Config
+import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap
+import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
+import org.apache.s2graph.counter.core.{RankingResult, RankingKey, 
RankingStorage}
+import org.apache.s2graph.counter.helper.WithRedis
+import org.apache.s2graph.counter.models.{Counter, CounterModel}
+import org.slf4j.LoggerFactory
+import redis.clients.jedis.Pipeline
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success}
+
+class RankingStorageRedis(config: Config) extends RankingStorage {
+  private[counter] val log = LoggerFactory.getLogger(this.getClass)
+  private[counter] val withRedis = new WithRedis(config)
+
+  val counterModel = new CounterModel(config)
+
+  val TOTAL = "_total_"
+
+  /**
+   * ex1)
+   * dimension = "age.32"
+   * ex2)
+   * dimension = "age.gender.32.m"
+   *
+   */
+  private def makeBucket(rankingKey: RankingKey): String = {
+    val policyId = rankingKey.policyId
+    val q = rankingKey.eq.tq.q
+    val ts = rankingKey.eq.tq.ts
+    val dimension = rankingKey.eq.dimension
+    if (dimension.nonEmpty) {
+      s"$policyId.$q.$ts.$dimension"
+    }
+    else {
+      s"$policyId.$q.$ts"
+    }
+  }
+
+  override def getTopK(rankingKey: RankingKey, k: Int): Option[RankingResult] 
= {
+    val bucket = makeBucket(rankingKey)
+    withRedis.doBlockWithKey(bucket) { jedis =>
+      jedis.zrevrangeByScoreWithScores(bucket, "+inf", "-inf", 0, k + 
1).toSeq.map(t => (t.getElement, t.getScore))
+    } match {
+      case Success(values) =>
+        if (values.nonEmpty) {
+//          println(values)
+          Some(RankingResult(values.find(_._1 == 
TOTAL).map(_._2).getOrElse(-1d), values.filter(_._1 != TOTAL).take(k)))
+        }
+        else {
+          None
+        }
+      case Failure(ex) =>
+        log.error(s"fail to get top k($ex). $rankingKey")
+        None
+    }
+  }
+
+  private def getTTL(policyId: Int, intervalUnit: IntervalUnit.IntervalUnit): 
Option[Int] = {
+    counterModel.findById(policyId).flatMap { policy =>
+      intervalUnit match {
+        case IntervalUnit.MINUTELY => Some(policy.ttl)
+        case IntervalUnit.HOURLY => Some(policy.ttl)
+        // default daily ttl 31 day
+        case IntervalUnit.DAILY => Some(policy.dailyTtl.getOrElse(31) * 24 * 
3600)
+        case IntervalUnit.MONTHLY => policy.dailyTtl
+        case IntervalUnit.TOTAL => policy.dailyTtl
+      }
+    }
+  }
+
+  override def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = 
{
+    // update ranking by score
+    val bucket = makeBucket(key)
+    withRedis.doBlockWithKey(bucket) { jedis =>
+      val pipeline = jedis.pipelined()
+      updateItem(pipeline, bucket, key, value, k)
+      pipeline.sync()
+    } match {
+      case Failure(ex) =>
+        log.error(s"fail to update $key $value: $ex")
+      case _ =>
+    }
+  }
+
+  private def updateItem(pipeline: Pipeline, bucket: String, key: RankingKey, 
value: RankingValueMap, k: Int): Unit = {
+    val topSeq = value.map { case (item, rv) =>
+      // jedis client accept only java's double
+      item -> rv.score.asInstanceOf[lang.Double]
+    }.toSeq.sortBy(_._2).takeRight(k)
+    pipeline.zadd(bucket, topSeq.toMap[String, lang.Double])
+    pipeline.zincrby(bucket, value.mapValues(_.increment).values.sum, TOTAL)
+    pipeline.zremrangeByRank(bucket, 0, -(k + 1))
+    // if ttl defined, set expire time to bucket
+    getTTL(key.policyId, key.eq.tq.q).foreach { ttl =>
+      pipeline.expire(bucket, ttl)
+    }
+  }
+
+  override def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): 
Unit = {
+    values.map { case (key, value) =>
+      (makeBucket(key), key, value)
+    }.groupBy { case (bucket, key, value) =>
+      withRedis.getBucketIdx(bucket)
+    }.foreach { case (idx, seq) =>
+      withRedis.doBlockWithIndex(idx) { jedis =>
+        val pipeline = jedis.pipelined()
+        for {
+          (bucket, key, value) <- seq
+        } {
+          updateItem(pipeline, bucket, key, value, k)
+        }
+        pipeline.sync()
+      } match {
+        case Failure(ex) =>
+          log.error(s"fail to update multi $idx: $ex")
+        case _ =>
+      }
+    }
+  }
+
+  override def delete(key: RankingKey): Unit = {
+    val bucket = makeBucket(key)
+    withRedis.doBlockWithKey(bucket) { jedis =>
+      jedis.del(bucket)
+    } match {
+      case Success(deleted) =>
+        log.info(s"success to delete $key")
+      case Failure(ex) =>
+        log.error(s"fail to delete $key")
+    }
+  }
+
+  override def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, 
RankingResult)] = {
+    keys.map { key =>
+      (makeBucket(key), key)
+    }.groupBy { case (bucket, key) =>
+      withRedis.getBucketIdx(bucket)
+    }.toSeq.par.flatMap { case (idx, seq) =>
+      withRedis.doBlockWithIndex(idx) { jedis =>
+        val pipeline = jedis.pipelined()
+        val keyWithRespLs = {
+          for {
+            (bucket, rankingKey) <- seq
+          } yield {
+            (rankingKey, pipeline.zrevrangeByScoreWithScores(bucket, "+inf", 
"-inf", 0, k + 1))
+          }
+        }
+        pipeline.sync()
+        for {
+          (rankingKey, resp) <- keyWithRespLs
+        } yield {
+          (rankingKey, resp.get().toSeq.map { t => (t.getElement, t.getScore)})
+        }
+      } match {
+        case Success(keyWithValues) =>
+          for {
+            (rankingKey, values) <- keyWithValues
+          } yield {
+            val result = RankingResult(values.find(_._1 == 
TOTAL).map(_._2).getOrElse(-1d), values.filter(_._1 != TOTAL).take(k))
+            (rankingKey, result)
+          }
+        case Failure(ex) =>
+          Nil
+      }
+    }
+  }.seq
+
+  override def prepare(policy: Counter): Unit = {
+    // do nothing
+  }
+
+  override def destroy(policy: Counter): Unit = {
+
+  }
+
+  override def ready(policy: Counter): Boolean = {
+    // always return true
+    true
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/BytesUtilV2.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/BytesUtilV2.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/BytesUtilV2.scala
new file mode 100644
index 0000000..a37506c
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/BytesUtilV2.scala
@@ -0,0 +1,88 @@
+package org.apache.s2graph.counter.core.v2
+
+import org.apache.hadoop.hbase.util._
+import org.apache.s2graph.counter
+import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
+import org.apache.s2graph.counter.core.{TimedQualifier, ExactQualifier, 
ExactKeyTrait, BytesUtil}
+import org.apache.s2graph.counter.models.Counter.ItemType
+import org.apache.s2graph.counter.util.Hashes
+import scala.collection.mutable.ArrayBuffer
+
+object BytesUtilV2 extends BytesUtil {
+  // ExactKey: [hash(1b)][version(1b)][policy(4b)][item(variable)]
+  val BUCKET_BYTE_SIZE = Bytes.SIZEOF_BYTE
+  val VERSION_BYTE_SIZE = Bytes.SIZEOF_BYTE
+  val POLICY_ID_SIZE = Bytes.SIZEOF_INT
+
+  val INTERVAL_SIZE = Bytes.SIZEOF_BYTE
+  val TIMESTAMP_SIZE = Bytes.SIZEOF_LONG
+  val TIMED_QUALIFIER_SIZE = INTERVAL_SIZE + TIMESTAMP_SIZE
+
+  override def getRowKeyPrefix(id: Int): Array[Byte] = {
+    Array(counter.VERSION_2) ++ Bytes.toBytes(id)
+  }
+
+  override def toBytes(key: ExactKeyTrait): Array[Byte] = {
+    val buff = new ArrayBuffer[Byte]
+    // hash byte
+    buff ++= Bytes.toBytes(Hashes.murmur3(key.itemKey)).take(BUCKET_BYTE_SIZE)
+
+    // row key prefix
+    // version + policy id
+    buff ++= getRowKeyPrefix(key.policyId)
+
+    buff ++= {
+      key.itemType match {
+        case ItemType.INT => Bytes.toBytes(key.itemKey.toInt)
+        case ItemType.LONG => Bytes.toBytes(key.itemKey.toLong)
+        case ItemType.STRING | ItemType.BLOB => Bytes.toBytes(key.itemKey)
+      }
+    }
+    buff.toArray
+  }
+
+  override def toBytes(eq: ExactQualifier): Array[Byte] = {
+    val len = eq.dimKeyValues.map { case (k, v) => k.length + 2 + v.length + 2 
}.sum
+    val pbr = new SimplePositionedMutableByteRange(len)
+    for {
+      v <- ExactQualifier.makeSortedDimension(eq.dimKeyValues)
+    } {
+      OrderedBytes.encodeString(pbr, v, Order.ASCENDING)
+    }
+    toBytes(eq.tq) ++ pbr.getBytes
+  }
+
+  override def toBytes(tq: TimedQualifier): Array[Byte] = {
+    val pbr = new SimplePositionedMutableByteRange(INTERVAL_SIZE + 2 + 
TIMESTAMP_SIZE + 1)
+    OrderedBytes.encodeString(pbr, tq.q.toString, Order.ASCENDING)
+    OrderedBytes.encodeInt64(pbr, tq.ts, Order.DESCENDING)
+    pbr.getBytes
+  }
+
+  private def decodeString(pbr: PositionedByteRange): Stream[String] = {
+    if (pbr.getRemaining > 0) {
+      Stream.cons(OrderedBytes.decodeString(pbr), decodeString(pbr))
+    }
+    else {
+      Stream.empty
+    }
+  }
+
+  override def toExactQualifier(bytes: Array[Byte]): ExactQualifier = {
+    val pbr = new SimplePositionedByteRange(bytes)
+    ExactQualifier(toTimedQualifier(pbr), {
+      val seqStr = decodeString(pbr).toSeq
+      val (keys, values) = seqStr.splitAt(seqStr.length / 2)
+      keys.zip(values).toMap
+    })
+  }
+
+  override def toTimedQualifier(bytes: Array[Byte]): TimedQualifier = {
+    val pbr = new SimplePositionedByteRange(bytes)
+    toTimedQualifier(pbr)
+  }
+
+  def toTimedQualifier(pbr: PositionedByteRange): TimedQualifier = {
+    TimedQualifier(IntervalUnit.withName(OrderedBytes.decodeString(pbr)), 
OrderedBytes.decodeInt64(pbr))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
new file mode 100644
index 0000000..522cf18
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
@@ -0,0 +1,343 @@
+package org.apache.s2graph.counter.core.v2
+
+import com.typesafe.config.Config
+import org.apache.http.HttpStatus
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.core
+import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap
+import org.apache.s2graph.counter.core._
+import org.apache.s2graph.counter.models.Counter
+import org.apache.s2graph.counter.util.CartesianProduct
+import org.slf4j.LoggerFactory
+import play.api.libs.json._
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+object ExactStorageGraph {
+  case class RespGraph(success: Boolean, result: Long)
+  implicit val respGraphFormat = Json.format[RespGraph]
+
+  // using play-ws without play app
+  private val builder = new 
com.ning.http.client.AsyncHttpClientConfig.Builder()
+  private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build)
+}
+
+case class ExactStorageGraph(config: Config) extends ExactStorage {
+  private val log = LoggerFactory.getLogger(this.getClass)
+  private val s2config = new S2CounterConfig(config)
+
+  private val SERVICE_NAME = "s2counter"
+  private val COLUMN_NAME = "bucket"
+  private val labelPostfix = "_counts"
+
+  val s2graphUrl = s2config.GRAPH_URL
+  val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL
+  val graphOp = new GraphOperation(config)
+
+  import ExactStorageGraph._
+
+  override def update(policy: Counter, counts: Seq[(ExactKeyTrait, 
ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = {
+    import scala.concurrent.ExecutionContext.Implicits.global
+
+    val (keyWithEq, reqJsLs) = toIncrementCountRequests(policy, 
counts).unzip(x => ((x._1, x._2), x._3))
+
+    val future = 
wsClient.url(s"$s2graphUrl/graphs/edges/incrementCount").post(Json.toJson(reqJsLs)).map
 { resp =>
+      resp.status match {
+        case HttpStatus.SC_OK =>
+          val respSeq = resp.json.as[Seq[RespGraph]]
+
+          val keyWithEqResult = {
+            for {
+              ((key, eq), RespGraph(success, result)) <- keyWithEq.zip(respSeq)
+            } yield {
+              (key, (eq, result))
+            }
+          }.groupBy(_._1).mapValues{ seq => seq.map(_._2).toMap }
+          keyWithEqResult
+        case _ =>
+          throw new RuntimeException(s"update failed: $policy $counts")
+      }
+    }
+    Await.result(future, 10 second)
+  }
+
+  def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = {
+
+  }
+
+  private def toIncrementCountRequests(policy: Counter,
+                                       counts: Seq[(ExactKeyTrait, 
ExactValueMap)])
+  : Seq[(ExactKeyTrait, core.ExactQualifier, JsValue)] = {
+    val labelName = policy.action + labelPostfix
+    val timestamp = System.currentTimeMillis()
+    for {
+      (exactKey, values) <- counts
+      (eq, value) <- values
+    } yield {
+      val from = exactKey.itemKey
+      val to = eq.dimension
+      val json = Json.obj(
+        "timestamp" -> timestamp,
+        "operation" -> "incrementCount",
+        "from" -> from,
+        "to" -> to,
+        "label" -> labelName,
+        "props" -> Json.obj(
+          "_count" -> value,
+          "time_unit" -> eq.tq.q.toString,
+          "time_value" -> eq.tq.ts
+        )
+      )
+      (exactKey, eq, json)
+    }
+  }
+
+  override def get(policy: Counter,
+                   items: Seq[String],
+                   timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)],
+                   dimQuery: Map[String, Set[String]])
+                  (implicit ex: ExecutionContext): 
Future[Seq[FetchedCountsGrouped]] = {
+    val labelName = policy.action + labelPostfix
+    val label = Label.findByName(labelName).get
+//    val label = labelModel.findByName(labelName).get
+
+    val ids = Json.toJson(items)
+
+    val dimensions = {
+      for {
+        values <- CartesianProduct(dimQuery.values.map(ss => ss.toList).toList)
+      } yield {
+        dimQuery.keys.zip(values).toMap
+      }
+    }
+
+    val stepJsLs = {
+      for {
+        (tqFrom, tqTo) <- timeRange
+        dimension <- dimensions
+      } yield {
+        val eqFrom = core.ExactQualifier(tqFrom, dimension)
+        val eqTo = core.ExactQualifier(tqTo, dimension)
+        val intervalJs =
+          s"""
+            |{
+            |  "from": {
+            |    "_to": "${eqFrom.dimension}",
+            |    "time_unit": "${eqFrom.tq.q}",
+            |    "time_value": ${eqFrom.tq.ts}
+            |  },
+            |  "to": {
+            |    "_to": "${eqTo.dimension}",
+            |    "time_unit": "${eqTo.tq.q}",
+            |    "time_value": ${eqTo.tq.ts + 1}
+            |  }
+            |}
+          """.stripMargin
+        val stepJs =
+          s"""
+            |{
+            |  "direction": "out",
+            |  "limit": -1,
+            |  "duplicate": "raw",
+            |  "label": "$labelName",
+            |  "interval": $intervalJs
+            |}
+           """.stripMargin
+        stepJs
+      }
+    }
+
+    val reqJsStr =
+      s"""
+        |{
+        |  "srcVertices": [
+        |    {"serviceName": "${policy.service}", "columnName": 
"${label.srcColumnName}", "ids": $ids}
+        |  ],
+        |  "steps": [
+        |    {
+        |      "step": [
+        |        ${stepJsLs.mkString(",")}
+        |      ]
+        |    }
+        |  ]
+        |}
+      """.stripMargin
+
+    val reqJs = Json.parse(reqJsStr)
+//    log.warn(s"query: ${reqJs.toString()}")
+
+    wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(reqJs).map { 
resp =>
+      resp.status match {
+        case HttpStatus.SC_OK =>
+          val respJs = resp.json
+//          println(respJs)
+          val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { 
result =>
+//            println(s"result: $result")
+            resultToExactKeyValues(policy, result)
+          }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap.groupBy { case 
(eq, v) => (eq.tq.q, eq.dimKeyValues) })
+          for {
+            (k, v) <- keyWithValues.toSeq
+          } yield {
+            FetchedCountsGrouped(k, v)
+          }
+        case n: Int =>
+          log.warn(s"getEdges status($n): $reqJsStr")
+//          println(s"getEdges status($n): $reqJsStr")
+          Nil
+      }
+    }
+  }
+
+  private def resultToExactKeyValues(policy: Counter, result: JsValue): 
(ExactKeyTrait, (core.ExactQualifier, Long)) = {
+    val from = result \ "from" match {
+      case s: JsString => s.as[String]
+      case n: JsNumber => n.as[Long].toString
+      case x: JsValue => throw new RuntimeException(s"$x's type must be string 
or number")
+    }
+    val dimension = (result \ "to").as[String]
+    val props = result \ "props"
+    val count = (props \ "_count").as[Long]
+    val timeUnit = (props \ "time_unit").as[String]
+    val timeValue = (props \ "time_value").as[Long]
+    (ExactKey(policy, from, checkItemType = true), 
(core.ExactQualifier(core.TimedQualifier(timeUnit, timeValue), dimension), 
count))
+  }
+
+  private def getInner(policy: Counter, key: ExactKeyTrait, eqs: 
Seq[core.ExactQualifier])
+                      (implicit ex: ExecutionContext): 
Future[Seq[FetchedCounts]] = {
+    val labelName = policy.action + labelPostfix
+
+    Label.findByName(labelName) match {
+      case Some(label) =>
+
+        val src = Json.obj("serviceName" -> policy.service, "columnName" -> 
label.srcColumnName, "id" -> key.itemKey)
+        val step = {
+          val stepLs = {
+            for {
+              eq <- eqs
+            } yield {
+              val from = Json.obj("_to" -> eq.dimension, "time_unit" -> 
eq.tq.q.toString, "time_value" -> eq.tq.ts)
+              val to = Json.obj("_to" -> eq.dimension, "time_unit" -> 
eq.tq.q.toString, "time_value" -> eq.tq.ts)
+              val interval = Json.obj("from" -> from, "to" -> to)
+              Json.obj("limit" -> 1, "label" -> labelName, "interval" -> 
interval)
+            }
+          }
+          Json.obj("step" -> stepLs)
+        }
+        val query = Json.obj("srcVertices" -> Json.arr(src), "steps" -> 
Json.arr(step))
+        //    println(s"query: ${query.toString()}")
+
+        wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(query).map { 
resp =>
+          resp.status match {
+            case HttpStatus.SC_OK =>
+              val respJs = resp.json
+              val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { 
result =>
+                resultToExactKeyValues(policy, result)
+              }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap)
+              for {
+                (key, eqWithValues) <- keyWithValues.toSeq
+              } yield {
+                FetchedCounts(key, eqWithValues)
+              }
+            case _ =>
+              Nil
+          }
+        }
+      case None =>
+        Future.successful(Nil)
+    }
+  }
+
+  // for query exact qualifier
+  override def get(policy: Counter, queries: Seq[(ExactKeyTrait, 
Seq[core.ExactQualifier])])
+                  (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
+    val futures = {
+      for {
+        (key, eqs) <- queries
+      } yield {
+//        println(s"$key $eqs")
+        getInner(policy, key, eqs)
+      }
+    }
+    Future.sequence(futures).map(_.flatten)
+  }
+
+  override def getBlobValue(policy: Counter, blobId: String): Option[String] = 
{
+    throw new RuntimeException("unsupported getBlobValue operation")
+  }
+
+  override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): 
Seq[Boolean] = {
+    throw new RuntimeException("unsupported insertBlobValue operation")
+  }
+
+  private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean 
= {
+    val action = policy.action
+    val counterLabelName = action + labelPostfix
+
+    Label.findByName(counterLabelName, useCache).nonEmpty
+  }
+
+  override def prepare(policy: Counter): Unit = {
+    val service = policy.service
+    val action = policy.action
+
+    val graphLabel = Label.findByName(action)
+    if (graphLabel.isEmpty) {
+      throw new Exception(s"label not found. $service.$action")
+    }
+
+    if (!existsLabel(policy, useCache = false)) {
+      val label = Label.findByName(action, useCache = false).get
+
+      val counterLabelName = action + labelPostfix
+      val defaultJson =
+        s"""
+           |{
+           |  "label": "$counterLabelName",
+           |  "srcServiceName": "$service",
+           |  "srcColumnName": "${label.tgtColumnName}",
+           |  "srcColumnType": "${label.tgtColumnType}",
+           |  "tgtServiceName": "$SERVICE_NAME",
+           |  "tgtColumnName": "$COLUMN_NAME",
+           |  "tgtColumnType": "string",
+           |  "indices": [
+           |    {"name": "time", "propNames": ["_to", "time_unit", 
"time_value"]}
+           |  ],
+           |  "props": [
+           |    {"name": "time_unit", "dataType": "string", "defaultValue": 
""},
+           |    {"name": "time_value", "dataType": "long", "defaultValue": 0}
+           |  ],
+           |  "hTableName": "${policy.hbaseTable.get}"
+           |}
+        """.stripMargin
+      val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match {
+        case Some(ttl) =>
+          Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> 
Json.toJson(ttl))
+        case None =>
+          Json.parse(defaultJson)
+      }
+
+      graphOp.createLabel(json)
+    }
+  }
+
+  override def destroy(policy: Counter): Unit = {
+    val action = policy.action
+
+    if (existsLabel(policy, useCache = false)) {
+      val counterLabelName = action + labelPostfix
+
+      graphOp.deleteLabel(counterLabelName)
+    }
+  }
+
+  override def ready(policy: Counter): Boolean = {
+    existsLabel(policy)
+  }
+
+  // for range query
+  override def get(policy: Counter, items: Seq[String], timeRange: 
Seq[(core.TimedQualifier, core.TimedQualifier)])
+                  (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
+    throw new NotImplementedError("Not implemented")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala
new file mode 100644
index 0000000..d1bb2ef
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/GraphOperation.scala
@@ -0,0 +1,48 @@
+package org.apache.s2graph.counter.core.v2
+
+import com.typesafe.config.Config
+import org.apache.http.HttpStatus
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.slf4j.LoggerFactory
+import play.api.libs.json.{JsObject, JsValue, Json}
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class GraphOperation(config: Config) {
+  // using play-ws without play app
+  private val builder = new 
com.ning.http.client.AsyncHttpClientConfig.Builder()
+  private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build)
+  private val s2config = new S2CounterConfig(config)
+  val s2graphUrl = s2config.GRAPH_URL
+  private[counter] val log = LoggerFactory.getLogger(this.getClass)
+
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  def createLabel(json: JsValue): Boolean = {
+    // fix counter label's schemaVersion
+    val newJson = json.as[JsObject] ++ Json.obj("schemaVersion" -> "v2")
+    val future = 
wsClient.url(s"$s2graphUrl/graphs/createLabel").post(newJson).map { resp =>
+      resp.status match {
+        case HttpStatus.SC_OK =>
+          true
+        case _ =>
+          throw new RuntimeException(s"failed createLabel. errCode: 
${resp.status} body: ${resp.body} query: $json")
+      }
+    }
+
+    Await.result(future, 10 second)
+  }
+
+  def deleteLabel(label: String): Boolean = {
+    val future = 
wsClient.url(s"$s2graphUrl/graphs/deleteLabel/$label").put("").map { resp =>
+      resp.status match {
+        case HttpStatus.SC_OK =>
+          true
+        case _ =>
+          throw new RuntimeException(s"failed deleteLabel. errCode: 
${resp.status} body: ${resp.body}")
+      }
+    }
+
+    Await.result(future, 10 second)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
new file mode 100644
index 0000000..add4b04
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
@@ -0,0 +1,395 @@
+package org.apache.s2graph.counter.core.v2
+
+import com.typesafe.config.Config
+import org.apache.commons.httpclient.HttpStatus
+import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap
+import org.apache.s2graph.counter.core.{RankingResult, RankingKey, 
RankingStorage}
+import org.apache.s2graph.counter.models.{Counter, CounterModel}
+import org.apache.s2graph.counter.util.{CollectionCacheConfig, CollectionCache}
+import org.slf4j.LoggerFactory
+import play.api.libs.json.{JsObject, JsString, JsValue, Json}
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.util.hashing.MurmurHash3
+
+object RankingStorageGraph {
+  // using play-ws without play app
+  private val builder = new 
com.ning.http.client.AsyncHttpClientConfig.Builder()
+  private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build)
+}
+
+class RankingStorageGraph(config: Config) extends RankingStorage {
+  import RankingStorageGraph._
+
+  private[counter] val log = LoggerFactory.getLogger(this.getClass)
+  private val s2config = new S2CounterConfig(config)
+
+  private val BUCKET_SHARD_COUNT = 53
+  private val SERVICE_NAME = "s2counter"
+  private val BUCKET_COLUMN_NAME = "bucket"
+  private val counterModel = new CounterModel(config)
+  private val labelPostfix = "_topK"
+
+  val s2graphUrl = s2config.GRAPH_URL
+  val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL
+
+  val prepareCache = new 
CollectionCache[Option[Boolean]](CollectionCacheConfig(10000, 600))
+  val graphOp = new GraphOperation(config)
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  private def makeBucketKey(rankingKey: RankingKey): String = {
+    val eq = rankingKey.eq
+    val tq = eq.tq
+    s"${tq.q}.${tq.ts}.${eq.dimension}"
+  }
+
+  // "", "age.32", "age.gender.32.M"
+  private def makeBucketShardKey(shardIdx: Int, rankingKey: RankingKey): 
String = {
+    s"$shardIdx.${makeBucketKey(rankingKey)}"
+  }
+
+  /**
+   * indexProps: ["time_unit", "time_value", "score"]
+   */
+  override def getTopK(key: RankingKey, k: Int): Option[RankingResult] = {
+    getTopK(Seq(key), k).headOption.map(_._2)
+  }
+
+  override def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, 
RankingResult)] = {
+    val futures = for {
+      key <- keys
+    } yield {
+      getEdges(key).map { edges =>
+        key -> RankingResult(0d, toWithScoreLs(edges).take(k))
+      }
+    }
+
+    Await.result(Future.sequence(futures), 10 seconds)
+  }
+
+  override def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = 
{
+    update(Seq((key, value)), k)
+  }
+
+  override def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): 
Unit = {
+    val futures = {
+      for {
+        (key, value) <- values
+      } yield {
+        // prepare dimension bucket edge
+        checkAndPrepareDimensionBucket(key)
+
+        val future = getEdges(key, "raw").flatMap { edges =>
+          val prevRankingSeq = toWithScoreLs(edges)
+          val prevRankingMap: Map[String, Double] = 
prevRankingSeq.groupBy(_._1).map(_._2.sortBy(-_._2).head)
+          val currentRankingMap: Map[String, Double] = value.mapValues(_.score)
+          val mergedRankingSeq = (prevRankingMap ++ 
currentRankingMap).toSeq.sortBy(-_._2).take(k)
+          val mergedRankingMap = mergedRankingSeq.toMap
+
+          val bucketRankingSeq = mergedRankingSeq.groupBy { case (itemId, 
score) =>
+            // 0-index
+            GraphUtil.transformHash(MurmurHash3.stringHash(itemId)) % 
BUCKET_SHARD_COUNT
+          }.map { case (shardIdx, groupedRanking) =>
+            shardIdx -> groupedRanking.filter { case (itemId, _) => 
currentRankingMap.contains(itemId) }
+          }.toSeq
+
+          insertBulk(key, bucketRankingSeq).flatMap { _ =>
+            val duplicatedItems = prevRankingMap.filterKeys(s => 
currentRankingMap.contains(s))
+            val cutoffItems = prevRankingMap.filterKeys(s => 
!mergedRankingMap.contains(s))
+            val deleteItems = duplicatedItems ++ cutoffItems
+
+            val keyWithEdgesLs = prevRankingSeq.map(_._1).zip(edges)
+            val deleteEdges = keyWithEdgesLs.filter{ case (s, _) => 
deleteItems.contains(s) }.map(_._2)
+
+            deleteAll(deleteEdges)
+          }
+        }
+
+        future
+      }
+    }
+
+    Await.result(Future.sequence(futures), 10 seconds)
+  }
+
+  private def toWithScoreLs(edges: List[JsValue]): List[(String, Double)] = {
+    for {
+      edgeJson <- edges
+      to = (edgeJson \ "to").as[JsValue]
+      score = (edgeJson \ "score").as[JsValue].toString().toDouble
+    } yield {
+      val toValue = to match {
+        case s: JsString => s.as[String]
+        case _ => to.toString()
+      }
+      toValue -> score
+    }
+  }
+
+  private def insertBulk(key: RankingKey, newRankingSeq: Seq[(Int, 
Seq[(String, Double)])]): Future[Boolean] = {
+    val labelName = counterModel.findById(key.policyId).get.action + 
labelPostfix
+    val timestamp: Long = System.currentTimeMillis
+    val payload = Json.toJson {
+      for {
+        (shardIdx, rankingSeq) <- newRankingSeq
+        (itemId, score) <- rankingSeq
+      } yield {
+        val srcId = makeBucketShardKey(shardIdx, key)
+        Json.obj(
+          "timestamp" -> timestamp,
+          "from" -> srcId,
+          "to" -> itemId,
+          "label" -> labelName,
+          "props" -> Json.obj(
+            "time_unit" -> key.eq.tq.q.toString,
+            "time_value" -> key.eq.tq.ts,
+            "date_time" -> key.eq.tq.dateTime,
+            "score" -> score
+          )
+        )
+      }
+    }
+
+    wsClient.url(s"$s2graphUrl/graphs/edges/insertBulk").post(payload).map { 
resp =>
+      resp.status match {
+        case HttpStatus.SC_OK =>
+          true
+        case _ =>
+          throw new RuntimeException(s"failed insertBulk. errCode: 
${resp.status}, body: ${resp.body}, query: $payload")
+      }
+    }
+  }
+
+  private def deleteAll(edges: List[JsValue]): Future[Boolean] = {
+    // /graphs/edges/delete
+    val futures = {
+      for {
+        groupedEdges <- edges.grouped(50)
+      } yield {
+        val payload = Json.toJson(groupedEdges)
+        wsClient.url(s"$s2graphUrl/graphs/edges/delete").post(payload).map { 
resp =>
+          resp.status match {
+            case HttpStatus.SC_OK =>
+              true
+            case _ =>
+              log.error(s"failed delete. errCode: ${resp.status}, body: 
${resp.body}, query: $payload")
+              false
+          }
+        }
+      }
+    }.toSeq
+
+    Future.sequence(futures).map { seq =>
+      seq.forall(x => x)
+    }
+  }
+
+  /** select and delete */
+  override def delete(key: RankingKey): Unit = {
+    val future = getEdges(key).flatMap { edges =>
+      deleteAll(edges)
+    }
+    Await.result(future, 10 second)
+  }
+
+  private def getEdges(key: RankingKey, duplicate: String="first"): 
Future[List[JsValue]] = {
+    val labelName = counterModel.findById(key.policyId).get.action + 
labelPostfix
+
+    val ids = {
+      (0 until BUCKET_SHARD_COUNT).map { shardIdx =>
+        s""""${makeBucketShardKey(shardIdx, key)}""""
+      }
+    }.mkString(",")
+
+    val strJs =
+      s"""
+         |{
+         |    "srcVertices": [
+         |        {
+         |            "serviceName": "$SERVICE_NAME",
+         |            "columnName": "$BUCKET_COLUMN_NAME",
+         |            "ids": [$ids]
+         |        }
+         |    ],
+         |    "steps": [
+         |        {
+         |            "step": [
+         |                {
+         |                    "label": "$labelName",
+         |                    "duplicate": "$duplicate",
+         |                    "direction": "out",
+         |                    "offset": 0,
+         |                    "limit": -1,
+         |                    "interval": {
+         |                      "from": {"time_unit": 
"${key.eq.tq.q.toString}", "time_value": ${key.eq.tq.ts}},
+         |                      "to": {"time_unit": "${key.eq.tq.q.toString}", 
"time_value": ${key.eq.tq.ts}}
+         |                    },
+         |                    "scoring": {"score": 1}
+         |                }
+         |            ]
+         |        }
+         |    ]
+         |}
+       """.stripMargin
+    log.debug(strJs)
+
+    val payload = Json.parse(strJs)
+    wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(payload).map { 
resp =>
+      resp.status match {
+        case HttpStatus.SC_OK =>
+          (resp.json \ "results").asOpt[List[JsValue]].getOrElse(Nil)
+        case _ =>
+          throw new RuntimeException(s"failed getEdges. errCode: 
${resp.status}, body: ${resp.body}, query: $payload")
+      }
+    }
+  }
+
+  private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean 
= {
+    val action = policy.action
+    val counterLabelName = action + labelPostfix
+
+    Label.findByName(counterLabelName, useCache).nonEmpty
+  }
+
+  private def checkAndPrepareDimensionBucket(rankingKey: RankingKey): Boolean 
= {
+    val dimension = rankingKey.eq.dimension
+    val bucketKey = makeBucketKey(rankingKey)
+    val labelName = "s2counter_topK_bucket"
+
+    val prepared = prepareCache.withCache(s"$dimension:$bucketKey") {
+      val checkReqJs = Json.arr(
+        Json.obj(
+          "label" -> labelName,
+          "direction" -> "out",
+          "from" -> dimension,
+          "to" -> makeBucketShardKey(BUCKET_SHARD_COUNT - 1, rankingKey)
+        )
+      )
+
+      val future = 
wsClient.url(s"$s2graphReadOnlyUrl/graphs/checkEdges").post(checkReqJs).map { 
resp =>
+        resp.status match {
+          case HttpStatus.SC_OK =>
+            val checkRespJs = resp.json
+            if (checkRespJs.as[Seq[JsValue]].nonEmpty) {
+              true
+            } else {
+              false
+            }
+          case _ =>
+            // throw exception
+            throw new Exception(s"failed checkEdges. ${resp.body} 
${resp.status}")
+        }
+      }.flatMap {
+        case true => Future.successful(Some(true))
+        case false =>
+          val insertReqJsLs = {
+            for {
+              i <- 0 until BUCKET_SHARD_COUNT
+            } yield {
+              Json.obj(
+                "timestamp" -> rankingKey.eq.tq.ts,
+                "from" -> dimension,
+                "to" -> makeBucketShardKey(i, rankingKey),
+                "label" -> labelName,
+                "props" -> Json.obj(
+                  "time_unit" -> rankingKey.eq.tq.q.toString,
+                  "date_time" -> rankingKey.eq.tq.dateTime
+                )
+              )
+            }
+          }
+          
wsClient.url(s"$s2graphUrl/graphs/edges/insert").post(Json.toJson(insertReqJsLs)).map
 { resp =>
+            resp.status match {
+              case HttpStatus.SC_OK =>
+                Some(true)
+              case _ =>
+                // throw exception
+                throw new Exception(s"failed insertEdges. ${resp.body} 
${resp.status}")
+            }
+          }
+      }.recover {
+        case e: Exception =>
+          None
+      }
+
+      Await.result(future, 10 second)
+    }
+    prepared.getOrElse(false)
+  }
+
+  override def prepare(policy: Counter): Unit = {
+    val service = policy.service
+    val action = policy.action
+
+    val graphLabel = {
+      policy.rateActionId match {
+        case Some(rateId) =>
+          counterModel.findById(rateId, useCache = false).flatMap { ratePolicy 
=>
+            Label.findByName(ratePolicy.action)
+          }
+        case None =>
+          Label.findByName(action)
+      }
+    }
+    if (graphLabel.isEmpty) {
+      throw new Exception(s"label not found. $service.$action")
+    }
+
+    if (!existsLabel(policy, useCache = false)) {
+      // find input label to specify target column
+      val inputLabelName = policy.rateActionId.flatMap { id =>
+        counterModel.findById(id, useCache = false).map(_.action)
+      }.getOrElse(action)
+      val label = graphLabel.get
+
+      val counterLabelName = action + labelPostfix
+      val defaultJson =
+        s"""
+           |{
+           |  "label": "$counterLabelName",
+           |  "srcServiceName": "$SERVICE_NAME",
+           |  "srcColumnName": "$BUCKET_COLUMN_NAME",
+           |  "srcColumnType": "string",
+           |  "tgtServiceName": "$service",
+           |  "tgtColumnName": "${label.tgtColumnName}",
+           |  "tgtColumnType": "${label.tgtColumnType}",
+           |  "indices": [
+           |    {"name": "time", "propNames": ["time_unit", "time_value", 
"score"]}
+           |  ],
+           |  "props": [
+           |    {"name": "time_unit", "dataType": "string", "defaultValue": 
""},
+           |    {"name": "time_value", "dataType": "long", "defaultValue": 0},
+           |    {"name": "date_time", "dataType": "long", "defaultValue": 0},
+           |    {"name": "score", "dataType": "float", "defaultValue": 0.0}
+           |  ],
+           |  "hTableName": "${policy.hbaseTable.get}"
+           |}
+         """.stripMargin
+      val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match {
+        case Some(ttl) =>
+          Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> 
Json.toJson(ttl))
+        case None =>
+          Json.parse(defaultJson)
+      }
+
+      graphOp.createLabel(json)
+    }
+  }
+
+  override def destroy(policy: Counter): Unit = {
+    val action = policy.action
+
+    if (existsLabel(policy, useCache = false)) {
+      val counterLabelName = action + labelPostfix
+
+      graphOp.deleteLabel(counterLabelName)
+    }
+  }
+
+  override def ready(policy: Counter): Boolean = {
+    existsLabel(policy)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/DecayFormula.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/DecayFormula.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/DecayFormula.scala
new file mode 100644
index 0000000..4e5da90
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/DecayFormula.scala
@@ -0,0 +1,6 @@
+package org.apache.s2graph.counter.decay
+
+trait DecayFormula {
+  def apply(value: Double, millis: Long): Double
+  def apply(value: Double, seconds: Int): Double
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/ExpDecayFormula.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/ExpDecayFormula.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/ExpDecayFormula.scala
new file mode 100644
index 0000000..1075421
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/decay/ExpDecayFormula.scala
@@ -0,0 +1,25 @@
+package org.apache.s2graph.counter.decay
+
+case class ExpDecayFormula(halfLifeInMillis: Double) extends DecayFormula {
+  val decayRate = - Math.log(2) / halfLifeInMillis
+
+  override def apply(value: Double, millis: Long): Double = {
+    value * Math.pow(Math.E, decayRate * millis)
+  }
+
+  override def apply(value: Double, seconds: Int): Double = {
+    apply(value, seconds * 1000L)
+  }
+}
+
+object ExpDecayFormula {
+  @deprecated("do not use. just experimental", "0.14")
+  def byWindowTime(windowInMillis: Long, pct: Double): ExpDecayFormula = {
+    val halfLife = windowInMillis * Math.log(0.5) / Math.log(pct)
+    ExpDecayFormula(halfLife)
+  }
+
+  def byMeanLifeTime(millis: Long): ExpDecayFormula = {
+    ExpDecayFormula(millis * Math.log(2))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
new file mode 100644
index 0000000..cae2245
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
@@ -0,0 +1,113 @@
+package org.apache.s2graph.counter.helper
+
+import com.typesafe.config.Config
+import org.apache
+import org.apache.s2graph.core.Graph
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.counter
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.core.{RankingCounter, ExactCounter}
+import org.apache.s2graph.counter.core.v1.{RankingStorageRedis, 
ExactStorageAsyncHBase}
+import org.apache.s2graph.counter.core.v2.{RankingStorageGraph, 
ExactStorageGraph, GraphOperation}
+import org.apache.s2graph.counter.models.{Counter, CounterModel}
+import play.api.libs.json.Json
+
+import scala.util.Try
+
+class CounterAdmin(config: Config) {
+   val s2config = new S2CounterConfig(config)
+   val counterModel = new CounterModel(config)
+   val graphOp = new GraphOperation(config)
+   val s2graph = new Graph(config)(scala.concurrent.ExecutionContext.global)
+   val storageManagement = new org.apache.s2graph.core.Management(s2graph)
+
+   def setupCounterOnGraph(): Unit = {
+     // create s2counter service
+     val service = "s2counter"
+     storageManagement.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, 
s"$service-${config.getString("phase")}", 1, None, "gz")
+     // create bucket label
+     val label = "s2counter_topK_bucket"
+     if (Label.findByName(label, useCache = false).isEmpty) {
+       val strJs =
+         s"""
+            |{
+            |  "label": "$label",
+            |  "srcServiceName": "s2counter",
+            |  "srcColumnName": "dimension",
+            |  "srcColumnType": "string",
+            |  "tgtServiceName": "s2counter",
+            |  "tgtColumnName": "bucket",
+            |  "tgtColumnType": "string",
+            |  "indices": [
+            |    {"name": "time", "propNames": ["time_unit", "date_time"]}
+            |  ],
+            |  "props": [
+            |      {"name": "time_unit", "dataType": "string", "defaultValue": 
""},
+            |      {"name": "date_time", "dataType": "long", "defaultValue": 0}
+            |  ],
+            |  "hTableName": "s2counter_60",
+            |  "hTableTTL": 5184000
+            |}
+         """.stripMargin
+       graphOp.createLabel(Json.parse(strJs))
+     }
+   }
+
+   def createCounter(policy: Counter): Unit = {
+     val newPolicy = policy.copy(hbaseTable = Some(makeHTableName(policy)))
+     prepareStorage(newPolicy)
+     counterModel.createServiceAction(newPolicy)
+   }
+
+   def deleteCounter(service: String, action: String): Option[Try[Unit]] = {
+     for {
+       policy <- counterModel.findByServiceAction(service, action, useCache = 
false)
+     } yield {
+       Try {
+         exactCounter(policy).destroy(policy)
+         if (policy.useRank) {
+           rankingCounter(policy).destroy(policy)
+         }
+         counterModel.deleteServiceAction(policy)
+       }
+     }
+   }
+
+   def prepareStorage(policy: Counter): Unit = {
+     if (policy.rateActionId.isEmpty) {
+       // if defined rate action, do not use exact counter
+       exactCounter(policy).prepare(policy)
+     }
+     if (policy.useRank) {
+       rankingCounter(policy).prepare(policy)
+     }
+   }
+
+   def prepareStorage(policy: Counter, version: Byte): Unit = {
+     // this function to prepare storage by version parameter instead of 
policy.version
+     prepareStorage(policy.copy(version = version))
+   }
+
+   private val exactCounterMap = Map(
+     counter.VERSION_1 -> new ExactCounter(config, new 
ExactStorageAsyncHBase(config)),
+     counter.VERSION_2 -> new ExactCounter(config, new 
ExactStorageGraph(config))
+   )
+   private val rankingCounterMap = Map(
+     apache.s2graph.counter.VERSION_1 -> new RankingCounter(config, new 
RankingStorageRedis(config)),
+     apache.s2graph.counter.VERSION_2 -> new RankingCounter(config, new 
RankingStorageGraph(config))
+   )
+
+   private val tablePrefixMap = Map (
+     apache.s2graph.counter.VERSION_1 -> "s2counter",
+     apache.s2graph.counter.VERSION_2 -> "s2counter_v2"
+   )
+
+   def exactCounter(version: Byte): ExactCounter = exactCounterMap(version)
+   def exactCounter(policy: Counter): ExactCounter = 
exactCounter(policy.version)
+   def rankingCounter(version: Byte): RankingCounter = 
rankingCounterMap(version)
+   def rankingCounter(policy: Counter): RankingCounter = 
rankingCounter(policy.version)
+
+   def makeHTableName(policy: Counter): String = {
+     Seq(tablePrefixMap(policy.version), policy.service, policy.ttl) ++ 
policy.dailyTtl mkString "_"
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/DistributedScanner.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/DistributedScanner.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/DistributedScanner.scala
new file mode 100644
index 0000000..c06cabe
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/DistributedScanner.scala
@@ -0,0 +1,70 @@
+package org.apache.s2graph.counter.helper
+
+import java.util
+import java.util.Comparator
+
+import com.google.common.primitives.SignedBytes
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+
+object DistributedScanner {
+   val BUCKET_BYTE_SIZE = Bytes.SIZEOF_BYTE
+
+   def getRealRowKey(result: Result): Array[Byte] = {
+     result.getRow.drop(BUCKET_BYTE_SIZE)
+   }
+ }
+
+class DistributedScanner(table: Table, scan: Scan) extends 
AbstractClientScanner {
+   import DistributedScanner._
+
+   private val BYTE_MAX = BigInt(256)
+
+   private[helper] val scanners = {
+     for {
+       i <- 0 until BYTE_MAX.pow(BUCKET_BYTE_SIZE).toInt
+     } yield {
+       val bucketBytes: Array[Byte] = 
Bytes.toBytes(i).takeRight(BUCKET_BYTE_SIZE)
+       val newScan = new Scan(scan).setStartRow(bucketBytes ++ 
scan.getStartRow).setStopRow(bucketBytes ++ scan.getStopRow)
+       table.getScanner(newScan)
+     }
+   }
+
+   val resultCache = new util.TreeMap[Result, java.util.Iterator[Result]](new 
Comparator[Result] {
+     val comparator = SignedBytes.lexicographicalComparator()
+     override def compare(o1: Result, o2: Result): Int = {
+       comparator.compare(getRealRowKey(o1), getRealRowKey(o2))
+     }
+   })
+
+   lazy val initialized = {
+     val iterators = scanners.map(_.iterator()).filter(_.hasNext)
+     iterators.foreach { it =>
+       resultCache.put(it.next(), it)
+     }
+     iterators.nonEmpty
+   }
+
+   override def next(): Result = {
+     if (initialized) {
+       Option(resultCache.pollFirstEntry()).map { entry =>
+         val it = entry.getValue
+         if (it.hasNext) {
+           // fill cache
+           resultCache.put(it.next(), it)
+         }
+         entry.getKey
+       }.orNull
+     } else {
+       null
+     }
+   }
+
+   override def close(): Unit = {
+     for {
+       scanner <- scanners
+     } {
+       scanner.close()
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/HashShardingJedis.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/HashShardingJedis.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/HashShardingJedis.scala
new file mode 100644
index 0000000..f970c34
--- /dev/null
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/HashShardingJedis.scala
@@ -0,0 +1,153 @@
+package org.apache.s2graph.counter.helper
+
+import com.typesafe.config.Config
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.util.Hashes
+import org.slf4j.LoggerFactory
+import redis.clients.jedis.exceptions.JedisException
+import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
+
+class HashShardingJedis(config: Config) {
+   lazy val s2config = new S2CounterConfig(config)
+
+   private val log = LoggerFactory.getLogger(getClass)
+
+   val poolConfig = new JedisPoolConfig()
+   poolConfig.setMaxTotal(150)
+   poolConfig.setMaxIdle(50)
+   poolConfig.setMaxWaitMillis(200)
+
+   val jedisPools = s2config.REDIS_INSTANCES.map { case (host, port) =>
+     new JedisPool(poolConfig, host, port)
+   }
+   val jedisPoolSize = jedisPools.size
+
+   def getJedisPool(idx: Int): JedisPool = {
+     if(idx >= jedisPoolSize)
+       null
+     else
+       jedisPools(idx)
+   }
+
+   def getJedisPoolWithBucketname2(bucket: String): JedisPool = {
+     val hashedValue = Hashes.murmur3(bucket)
+     val idx = hashedValue % jedisPoolSize
+     getJedisPool(idx)
+   }
+
+   def getJedisPoolWithBucketname(bucket: String): (JedisPool, JedisPool) = {
+     val hashedValue = Hashes.murmur3(bucket)
+     val idx = hashedValue % jedisPoolSize
+     val secondaryIdx = if (jedisPoolSize <= 1) {
+       throw new Exception("too small sharding pool <= 1")
+     } else {
+       val newIdx = (hashedValue / jedisPoolSize) % (jedisPoolSize -1)
+       if(newIdx < idx) {
+         newIdx
+       } else {
+         newIdx +1
+       }
+     }
+     (getJedisPool(idx), getJedisPool(secondaryIdx))
+   }
+
+   def doBlockWithJedisInstace(f : Jedis => Any, fallBack : => Any, jedis : 
Jedis) = {
+     try {
+       f(jedis)
+     }
+     catch {
+       case e:JedisException => {
+         fallBack
+       }
+     }
+   }
+
+   def doBlockWithBucketName(f : Jedis => Any, fallBack : => Any, bucket : 
String) = {
+ //    Logger.debug(s"start jedis do block")
+     //val (jedis_pool1, jedis_pool2) = getJedisPoolWithBucketname(bucket)
+     val jedis_pool1= getJedisPoolWithBucketname2(bucket)
+ //    if(jedis_pool1 != null && jedis_pool2 != null) {
+     if(jedis_pool1 != null) {
+       var jedis1: Jedis = null
+ //      var jedis2: Jedis = null
+       try {
+         jedis1 = jedis_pool1.getResource()
+ //        jedis2 = jedis_pool2.getResource()
+         log.info(s">> Jedis Pool Active Num : ${jedis_pool1.getNumActive}")
+
+         /* val f1 = Future(f(jedis1))
+         val f2 = Future(f(jedis2))
+
+         val mixedFuture = Future.sequence(List(f1,f2))   */
+
+         val r1 = f(jedis1)
+         //val r2 = f(jedis2)
+
+         r1
+       }
+       catch {
+         case e:JedisException => {
+ //          Logger.debug(s"following exception catched")
+ //          Logger.debug(s"${e}")
+           jedis_pool1.returnBrokenResource(jedis1)
+ //          jedis_pool2.returnBrokenResource(jedis2)
+
+           jedis1 = null
+ //          jedis2 = null
+           fallBack
+         }
+       }
+       finally {
+         if (jedis1 != null) jedis_pool1.returnResource(jedis1)
+ //        if (jedis2 != null) jedis_pool2.returnResource(jedis2)
+       }
+     }
+     else{
+ //      Logger.debug(s"fallback executed")
+       fallBack
+     }
+   }
+
+   def doBlockWithKey[T](key: String)(f: Jedis => T)(fallBack: => T) = {
+ //    Logger.debug(s"start jedis do block")
+     val (jedis_pool1, jedis_pool2) = getJedisPoolWithBucketname(key)
+     if(jedis_pool1 != null && jedis_pool2 != null) {
+       var jedis1: Jedis = null
+       var jedis2: Jedis = null
+       try {
+         jedis1 = jedis_pool1.getResource()
+         jedis2 = jedis_pool2.getResource()
+
+         /* val f1 = Future(f(jedis1))
+         val f2 = Future(f(jedis2))
+
+         val mixedFuture = Future.sequence(List(f1,f2))   */
+
+         val r1 = f(jedis1)
+         //val r2 = f(jedis2)
+
+         r1
+       }
+       catch {
+         case e:JedisException => {
+ //          Logger.debug(s"following exception catched")
+ //          Logger.debug(s"${e}")
+           jedis_pool1.returnBrokenResource(jedis1)
+           jedis_pool2.returnBrokenResource(jedis2)
+
+           jedis1 = null
+           jedis2 = null
+           fallBack
+         }
+       }
+       finally {
+         if (jedis1 != null) jedis_pool1.returnResource(jedis1)
+         if (jedis2 != null) jedis_pool2.returnResource(jedis2)
+       }
+     }
+     else{
+ //      Logger.debug(s"fallback executed")
+       fallBack
+     }
+   }
+ }

Reply via email to