http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala 
b/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
deleted file mode 100644
index a664de4..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
+++ /dev/null
@@ -1,327 +0,0 @@
-package s2.counter.core.v1
-
-import com.kakao.s2graph.core.Graph
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.CellUtil
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.filter.{ColumnRangeFilter, FilterList}
-import org.apache.hadoop.hbase.util.Bytes
-import org.slf4j.LoggerFactory
-import s2.config.S2CounterConfig
-import s2.counter.core.ExactCounter.ExactValueMap
-import s2.counter.core._
-import s2.helper.{Management, WithHBase}
-import s2.models.Counter
-import s2.models.Counter.ItemType
-
-import scala.collection.JavaConversions._
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-
-class ExactStorageHBase(config: Config) extends ExactStorage {
-  import ExactStorageHBase._
-
-  private val log = LoggerFactory.getLogger(getClass)
-
-  lazy val s2config = new S2CounterConfig(config)
-
-  private[counter] val withHBase = new WithHBase(config)
-  private[counter] val hbaseManagement = new Management(config)
-
-
-
-  private def getTableName(policy: Counter): String = {
-    policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME)
-  }
-
-  override def get(policy: Counter,
-                   items: Seq[String],
-                   timeRange: Seq[(TimedQualifier, TimedQualifier)])
-                  (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
-    lazy val messageForLog = s"${policy.service}.${policy.action} $items 
$timeRange"
-
-    val keys = {
-      for {
-        item <- items
-      } yield {
-        ExactKey(policy, item, checkItemType = true)
-      }
-    }
-
-    val gets = {
-      for {
-        key <- keys
-      } yield {
-        val get = new Get(BytesUtilV1.toBytes(key))
-        timeRange.map(t => intervalsMap(t._1.q)).distinct.foreach { cf =>
-          get.addFamily(cf.toString.getBytes)
-        }
-        get.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE, {
-          for {
-            (from, to) <- timeRange
-          } yield {
-            new ColumnRangeFilter(
-              BytesUtilV1.toBytes(from), true,
-              BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false)
-          }
-        }))
-      }
-    }
-
-    //    println(s"$messageForLog $gets")
-    Future {
-      withHBase(getTableName(policy)) { table =>
-        for {
-          (rst, key) <- table.get(gets).zip(keys) if !rst.isEmpty
-        } yield {
-          val qualifierWithCounts = {
-            for {
-              cell <- rst.listCells()
-              eq = BytesUtilV1.toExactQualifier(CellUtil.cloneQualifier(cell))
-            } yield {
-              eq -> Bytes.toLong(CellUtil.cloneValue(cell))
-            }
-          }.toMap
-          FetchedCounts(key, qualifierWithCounts)
-        }
-      } match {
-        case Success(rst) => rst
-        case Failure(ex) =>
-          log.error(s"$ex: $messageForLog")
-          Nil
-      }
-    }
-  }
-
-  override def get(policy: Counter,
-                   items: Seq[String],
-                   timeRange: Seq[(TimedQualifier, TimedQualifier)],
-                   dimQuery: Map[String, Set[String]])
-                  (implicit ec: ExecutionContext): 
Future[Seq[FetchedCountsGrouped]] = {
-    get(policy, items, timeRange).map { fetchedLs =>
-      for {
-        FetchedCounts(exactKey, qualifierWithCountMap) <- fetchedLs
-      } yield {
-        val intervalWithCountMap = qualifierWithCountMap
-          .filter { case (eq, v) => eq.checkDimensionEquality(dimQuery) }
-          .groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) }
-        FetchedCountsGrouped(exactKey, intervalWithCountMap)
-      }
-    }
-  }
-
-  override def update(policy: Counter, counts: Seq[(ExactKeyTrait, 
ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = {
-    // increment mutation to hbase
-    val increments = {
-      for {
-        (exactKey, values) <- counts
-        inc = new Increment(BytesUtilV1.toBytes(exactKey))
-      } yield {
-        for {
-          (eq, value) <- values
-        } {
-          inc.addColumn(intervalsMap.apply(eq.tq.q).toString.getBytes, 
BytesUtilV1.toBytes(eq), value)
-        }
-        // add column by dimension
-        inc
-      }
-    }
-
-    val results: Array[Object] = Array.fill(increments.size)(null)
-
-    withHBase(getTableName(policy)) { table =>
-      table.batch(increments, results)
-    } match {
-      case Failure(ex) =>
-        log.error(s"${ex.getMessage}")
-      case _ =>
-    }
-
-    assert(counts.length == results.length)
-
-    for {
-      ((exactKey, eqWithValue), result) <- counts.zip(results)
-    } yield {
-      val eqWithResult = result match {
-        case r: Result =>
-          for {
-            (eq, value) <- eqWithValue
-          } yield {
-            val interval = eq.tq.q
-            val cf = intervalsMap(interval)
-            val result = Option(r.getColumnLatestCell(cf.toString.getBytes, 
BytesUtilV1.toBytes(eq))).map { cell =>
-              Bytes.toLong(CellUtil.cloneValue(cell))
-            }.getOrElse(-1l)
-            eq -> result
-          }
-        case ex: Throwable =>
-          log.error(s"${ex.getMessage}: $exactKey")
-          Nil
-        case _ =>
-          log.error(s"result is null: $exactKey")
-          Nil
-      }
-      (exactKey, eqWithResult.toMap)
-    }
-  }.toMap
-
-  override def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = {
-    withHBase(getTableName(policy)) { table =>
-      table.delete {
-        for {
-          key <- keys
-        } yield {
-          new Delete(BytesUtilV1.toBytes(key))
-        }
-      }
-    } match {
-      case Failure(ex) =>
-        log.error(ex.getMessage)
-      case _ =>
-    }
-  }
-
-  override def get(policy: Counter,
-                   queries: Seq[(ExactKeyTrait, Seq[ExactQualifier])])
-                  (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
-    lazy val messageForLog = s"${policy.service}.${policy.action} $queries"
-
-    val gets = {
-      for {
-        (key, eqs) <- queries
-      } yield {
-        //        println(s"$key $eqsGrouped")
-        val get = new Get(BytesUtilV1.toBytes(key))
-
-        for {
-          eq <- eqs
-        } {
-          val cf = intervalsMap(eq.tq.q)
-          get.addColumn(cf.toString.getBytes, BytesUtilV1.toBytes(eq))
-        }
-        get
-      }
-    }
-
-    Future {
-      withHBase(getTableName(policy)) { table =>
-        for {
-          (rst, key) <- table.get(gets).zip(queries.map(_._1)) if !rst.isEmpty
-        } yield {
-          val qualifierWithCounts = {
-            for {
-              cell <- rst.listCells()
-              eq = BytesUtilV1.toExactQualifier(CellUtil.cloneQualifier(cell))
-            } yield {
-              eq -> Bytes.toLong(CellUtil.cloneValue(cell))
-            }
-          }.toMap
-          FetchedCounts(key, qualifierWithCounts)
-        }
-      } match {
-        case Success(rst) => rst.toSeq
-        case Failure(ex) =>
-          log.error(s"$ex: $messageForLog")
-          Nil
-      }
-    }
-  }
-
-  override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): 
Seq[Boolean] = {
-    val results: Array[Object] = Array.fill(keys.size)(null)
-
-    val puts = keys.map { key =>
-      val put = new Put(BytesUtilV1.toBytes(key))
-      put.addColumn(blobCF, blobColumn, key.itemId.getBytes)
-    }
-
-    withHBase(getTableName(policy)) { table =>
-      table.batch(puts, results)
-    } match {
-      case Failure(ex) =>
-        log.error(s"${ex.getMessage}")
-      case _ =>
-    }
-
-    for {
-      (result, key) <- results.zip(keys)
-    } yield {
-      Option(result).map(_ => true).getOrElse {
-        log.error(s"fail to insert blob value: $key")
-        false
-      }
-    }
-  }
-
-  override def getBlobValue(policy: Counter, blobId: String): Option[String] = 
{
-    lazy val messageForLog = s"${policy.service}.${policy.action}.$blobId"
-
-    policy.itemType match {
-      case ItemType.BLOB =>
-        withHBase(getTableName(policy)) { table =>
-          val rowKey = BytesUtilV1.toBytes(ExactKey(policy.id, policy.version, 
policy.itemType, blobId))
-          val get = new Get(rowKey)
-          get.addColumn(blobCF, blobColumn)
-          table.get(get)
-        } match {
-          case Success(result) =>
-            Option(result).filter(!_.isEmpty).map { rst =>
-              Bytes.toString(rst.getValue(blobCF, blobColumn))
-            }
-          case Failure(ex) =>
-            throw ex
-        }
-      case _ =>
-        log.warn(s"is not blob type counter. $messageForLog")
-        throw new Exception(s"is not blob type counter. $messageForLog")
-    }
-  }
-
-  override def prepare(policy: Counter): Unit = {
-    // create hbase table
-    policy.hbaseTable.foreach { table =>
-
-      if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, 
table)) {
-        hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table,
-          ColumnFamily.values.map(_.toString).toList, 1)
-        hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, 
ColumnFamily.SHORT.toString, policy.ttl)
-        policy.dailyTtl.foreach { i =>
-          hbaseManagement.setTTL(s2config.HBASE_ZOOKEEPER_QUORUM, table, 
ColumnFamily.LONG.toString, i * 24 * 60 * 60)
-        }
-      }
-    }
-  }
-
-  override def destroy(policy: Counter): Unit = {
-
-  }
-
-  override def ready(policy: Counter): Boolean = {
-    policy.hbaseTable.map { table =>
-      hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)
-    }.getOrElse(true)
-  }
-}
-
-object ExactStorageHBase {
-  import TimedQualifier.IntervalUnit._
-
-  object ColumnFamily extends Enumeration {
-    type ColumnFamily = Value
-
-    val SHORT = Value("s")
-    val LONG = Value("l")
-  }
-
-  val blobCF = ColumnFamily.LONG.toString.getBytes
-  val blobColumn = "b".getBytes
-
-  val intervalsMap = Map(
-    MINUTELY -> ColumnFamily.SHORT,
-    HOURLY -> ColumnFamily.SHORT,
-    DAILY -> ColumnFamily.LONG,
-    MONTHLY -> ColumnFamily.LONG,
-    TOTAL -> ColumnFamily.LONG
-  )
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/counter/core/v1/RankingStorageRedis.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/s2/counter/core/v1/RankingStorageRedis.scala 
b/s2counter_core/src/main/scala/s2/counter/core/v1/RankingStorageRedis.scala
deleted file mode 100644
index ea15a9c..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/v1/RankingStorageRedis.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-package s2.counter.core.v1
-
-import java.lang
-
-import com.typesafe.config.Config
-import org.slf4j.LoggerFactory
-import redis.clients.jedis.Pipeline
-import s2.counter.core.RankingCounter.RankingValueMap
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core.{RankingKey, RankingResult, RankingStorage}
-import s2.helper.WithRedis
-import s2.models.{Counter, CounterModel}
-
-import scala.collection.JavaConversions._
-import scala.util.{Failure, Success}
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 22..
- */
-class RankingStorageRedis(config: Config) extends RankingStorage {
-  private[counter] val log = LoggerFactory.getLogger(this.getClass)
-  private[counter] val withRedis = new WithRedis(config)
-
-  val counterModel = new CounterModel(config)
-
-  val TOTAL = "_total_"
-
-  /**
-   * ex1)
-   * dimension = "age.32"
-   * ex2)
-   * dimension = "age.gender.32.m"
-   *
-   */
-  private def makeBucket(rankingKey: RankingKey): String = {
-    val policyId = rankingKey.policyId
-    val q = rankingKey.eq.tq.q
-    val ts = rankingKey.eq.tq.ts
-    val dimension = rankingKey.eq.dimension
-    if (dimension.nonEmpty) {
-      s"$policyId.$q.$ts.$dimension"
-    }
-    else {
-      s"$policyId.$q.$ts"
-    }
-  }
-
-  override def getTopK(rankingKey: RankingKey, k: Int): Option[RankingResult] 
= {
-    val bucket = makeBucket(rankingKey)
-    withRedis.doBlockWithKey(bucket) { jedis =>
-      jedis.zrevrangeByScoreWithScores(bucket, "+inf", "-inf", 0, k + 
1).toSeq.map(t => (t.getElement, t.getScore))
-    } match {
-      case Success(values) =>
-        if (values.nonEmpty) {
-//          println(values)
-          Some(RankingResult(values.find(_._1 == 
TOTAL).map(_._2).getOrElse(-1d), values.filter(_._1 != TOTAL).take(k)))
-        }
-        else {
-          None
-        }
-      case Failure(ex) =>
-        log.error(s"fail to get top k($ex). $rankingKey")
-        None
-    }
-  }
-
-  private def getTTL(policyId: Int, intervalUnit: IntervalUnit.IntervalUnit): 
Option[Int] = {
-    counterModel.findById(policyId).flatMap { policy =>
-      intervalUnit match {
-        case IntervalUnit.MINUTELY => Some(policy.ttl)
-        case IntervalUnit.HOURLY => Some(policy.ttl)
-        // default daily ttl 31 day
-        case IntervalUnit.DAILY => Some(policy.dailyTtl.getOrElse(31) * 24 * 
3600)
-        case IntervalUnit.MONTHLY => policy.dailyTtl
-        case IntervalUnit.TOTAL => policy.dailyTtl
-      }
-    }
-  }
-
-  override def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = 
{
-    // update ranking by score
-    val bucket = makeBucket(key)
-    withRedis.doBlockWithKey(bucket) { jedis =>
-      val pipeline = jedis.pipelined()
-      updateItem(pipeline, bucket, key, value, k)
-      pipeline.sync()
-    } match {
-      case Failure(ex) =>
-        log.error(s"fail to update $key $value: $ex")
-      case _ =>
-    }
-  }
-
-  private def updateItem(pipeline: Pipeline, bucket: String, key: RankingKey, 
value: RankingValueMap, k: Int): Unit = {
-    val topSeq = value.map { case (item, rv) =>
-      // jedis client accept only java's double
-      item -> rv.score.asInstanceOf[lang.Double]
-    }.toSeq.sortBy(_._2).takeRight(k)
-    pipeline.zadd(bucket, topSeq.toMap[String, lang.Double])
-    pipeline.zincrby(bucket, value.mapValues(_.increment).values.sum, TOTAL)
-    pipeline.zremrangeByRank(bucket, 0, -(k + 1))
-    // if ttl defined, set expire time to bucket
-    getTTL(key.policyId, key.eq.tq.q).foreach { ttl =>
-      pipeline.expire(bucket, ttl)
-    }
-  }
-
-  override def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): 
Unit = {
-    values.map { case (key, value) =>
-      (makeBucket(key), key, value)
-    }.groupBy { case (bucket, key, value) =>
-      withRedis.getBucketIdx(bucket)
-    }.foreach { case (idx, seq) =>
-      withRedis.doBlockWithIndex(idx) { jedis =>
-        val pipeline = jedis.pipelined()
-        for {
-          (bucket, key, value) <- seq
-        } {
-          updateItem(pipeline, bucket, key, value, k)
-        }
-        pipeline.sync()
-      } match {
-        case Failure(ex) =>
-          log.error(s"fail to update multi $idx: $ex")
-        case _ =>
-      }
-    }
-  }
-
-  override def delete(key: RankingKey): Unit = {
-    val bucket = makeBucket(key)
-    withRedis.doBlockWithKey(bucket) { jedis =>
-      jedis.del(bucket)
-    } match {
-      case Success(deleted) =>
-        log.info(s"success to delete $key")
-      case Failure(ex) =>
-        log.error(s"fail to delete $key")
-    }
-  }
-
-  override def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, 
RankingResult)] = {
-    keys.map { key =>
-      (makeBucket(key), key)
-    }.groupBy { case (bucket, key) =>
-      withRedis.getBucketIdx(bucket)
-    }.toSeq.par.flatMap { case (idx, seq) =>
-      withRedis.doBlockWithIndex(idx) { jedis =>
-        val pipeline = jedis.pipelined()
-        val keyWithRespLs = {
-          for {
-            (bucket, rankingKey) <- seq
-          } yield {
-            (rankingKey, pipeline.zrevrangeByScoreWithScores(bucket, "+inf", 
"-inf", 0, k + 1))
-          }
-        }
-        pipeline.sync()
-        for {
-          (rankingKey, resp) <- keyWithRespLs
-        } yield {
-          (rankingKey, resp.get().toSeq.map { t => (t.getElement, t.getScore)})
-        }
-      } match {
-        case Success(keyWithValues) =>
-          for {
-            (rankingKey, values) <- keyWithValues
-          } yield {
-            val result = RankingResult(values.find(_._1 == 
TOTAL).map(_._2).getOrElse(-1d), values.filter(_._1 != TOTAL).take(k))
-            (rankingKey, result)
-          }
-        case Failure(ex) =>
-          Nil
-      }
-    }
-  }.seq
-
-  override def prepare(policy: Counter): Unit = {
-    // do nothing
-  }
-
-  override def destroy(policy: Counter): Unit = {
-
-  }
-
-  override def ready(policy: Counter): Boolean = {
-    // always return true
-    true
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/counter/core/v2/BytesUtilV2.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/v2/BytesUtilV2.scala 
b/s2counter_core/src/main/scala/s2/counter/core/v2/BytesUtilV2.scala
deleted file mode 100644
index f839221..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/v2/BytesUtilV2.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-package s2.counter.core.v2
-
-import org.apache.hadoop.hbase.util._
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core._
-import s2.models.Counter.ItemType
-import s2.util.Hashes
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 11..
- */
-object BytesUtilV2 extends BytesUtil {
-  // ExactKey: [hash(1b)][version(1b)][policy(4b)][item(variable)]
-  val BUCKET_BYTE_SIZE = Bytes.SIZEOF_BYTE
-  val VERSION_BYTE_SIZE = Bytes.SIZEOF_BYTE
-  val POLICY_ID_SIZE = Bytes.SIZEOF_INT
-
-  val INTERVAL_SIZE = Bytes.SIZEOF_BYTE
-  val TIMESTAMP_SIZE = Bytes.SIZEOF_LONG
-  val TIMED_QUALIFIER_SIZE = INTERVAL_SIZE + TIMESTAMP_SIZE
-
-  override def getRowKeyPrefix(id: Int): Array[Byte] = {
-    Array(s2.counter.VERSION_2) ++ Bytes.toBytes(id)
-  }
-
-  override def toBytes(key: ExactKeyTrait): Array[Byte] = {
-    val buff = new ArrayBuffer[Byte]
-    // hash byte
-    buff ++= Bytes.toBytes(Hashes.murmur3(key.itemKey)).take(BUCKET_BYTE_SIZE)
-
-    // row key prefix
-    // version + policy id
-    buff ++= getRowKeyPrefix(key.policyId)
-
-    buff ++= {
-      key.itemType match {
-        case ItemType.INT => Bytes.toBytes(key.itemKey.toInt)
-        case ItemType.LONG => Bytes.toBytes(key.itemKey.toLong)
-        case ItemType.STRING | ItemType.BLOB => Bytes.toBytes(key.itemKey)
-      }
-    }
-    buff.toArray
-  }
-
-  override def toBytes(eq: ExactQualifier): Array[Byte] = {
-    val len = eq.dimKeyValues.map { case (k, v) => k.length + 2 + v.length + 2 
}.sum
-    val pbr = new SimplePositionedMutableByteRange(len)
-    for {
-      v <- ExactQualifier.makeSortedDimension(eq.dimKeyValues)
-    } {
-      OrderedBytes.encodeString(pbr, v, Order.ASCENDING)
-    }
-    toBytes(eq.tq) ++ pbr.getBytes
-  }
-
-  override def toBytes(tq: TimedQualifier): Array[Byte] = {
-    val pbr = new SimplePositionedMutableByteRange(INTERVAL_SIZE + 2 + 
TIMESTAMP_SIZE + 1)
-    OrderedBytes.encodeString(pbr, tq.q.toString, Order.ASCENDING)
-    OrderedBytes.encodeInt64(pbr, tq.ts, Order.DESCENDING)
-    pbr.getBytes
-  }
-
-  private def decodeString(pbr: PositionedByteRange): Stream[String] = {
-    if (pbr.getRemaining > 0) {
-      Stream.cons(OrderedBytes.decodeString(pbr), decodeString(pbr))
-    }
-    else {
-      Stream.empty
-    }
-  }
-
-  override def toExactQualifier(bytes: Array[Byte]): ExactQualifier = {
-    val pbr = new SimplePositionedByteRange(bytes)
-    ExactQualifier(toTimedQualifier(pbr), {
-      val seqStr = decodeString(pbr).toSeq
-      val (keys, values) = seqStr.splitAt(seqStr.length / 2)
-      keys.zip(values).toMap
-    })
-  }
-
-  override def toTimedQualifier(bytes: Array[Byte]): TimedQualifier = {
-    val pbr = new SimplePositionedByteRange(bytes)
-    toTimedQualifier(pbr)
-  }
-
-  def toTimedQualifier(pbr: PositionedByteRange): TimedQualifier = {
-    TimedQualifier(IntervalUnit.withName(OrderedBytes.decodeString(pbr)), 
OrderedBytes.decodeInt64(pbr))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/counter/core/v2/ExactStorageGraph.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/s2/counter/core/v2/ExactStorageGraph.scala 
b/s2counter_core/src/main/scala/s2/counter/core/v2/ExactStorageGraph.scala
deleted file mode 100644
index 2b225c8..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/v2/ExactStorageGraph.scala
+++ /dev/null
@@ -1,346 +0,0 @@
-package s2.counter.core.v2
-
-import com.kakao.s2graph.core.mysqls.Label
-import com.typesafe.config.Config
-import org.apache.http.HttpStatus
-import org.slf4j.LoggerFactory
-import play.api.libs.json._
-import s2.config.S2CounterConfig
-import s2.counter.core.ExactCounter.ExactValueMap
-import s2.counter.core._
-import s2.models.Counter
-import s2.util.CartesianProduct
-
-import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future}
-
-/**
- * Created by hsleep([email protected]) on 15. 8. 19..
- */
-object ExactStorageGraph {
-  case class RespGraph(success: Boolean, result: Long)
-  implicit val respGraphFormat = Json.format[RespGraph]
-
-  // using play-ws without play app
-  private val builder = new 
com.ning.http.client.AsyncHttpClientConfig.Builder()
-  private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build)
-}
-
-case class ExactStorageGraph(config: Config) extends ExactStorage {
-  private val log = LoggerFactory.getLogger(this.getClass)
-  private val s2config = new S2CounterConfig(config)
-
-  private val SERVICE_NAME = "s2counter"
-  private val COLUMN_NAME = "bucket"
-  private val labelPostfix = "_counts"
-
-  val s2graphUrl = s2config.GRAPH_URL
-  val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL
-  val graphOp = new GraphOperation(config)
-
-  import ExactStorageGraph._
-
-  override def update(policy: Counter, counts: Seq[(ExactKeyTrait, 
ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = {
-    import scala.concurrent.ExecutionContext.Implicits.global
-
-    val (keyWithEq, reqJsLs) = toIncrementCountRequests(policy, 
counts).unzip(x => ((x._1, x._2), x._3))
-
-    val future = 
wsClient.url(s"$s2graphUrl/graphs/edges/incrementCount").post(Json.toJson(reqJsLs)).map
 { resp =>
-      resp.status match {
-        case HttpStatus.SC_OK =>
-          val respSeq = resp.json.as[Seq[RespGraph]]
-
-          val keyWithEqResult = {
-            for {
-              ((key, eq), RespGraph(success, result)) <- keyWithEq.zip(respSeq)
-            } yield {
-              (key, (eq, result))
-            }
-          }.groupBy(_._1).mapValues{ seq => seq.map(_._2).toMap }
-          keyWithEqResult
-        case _ =>
-          throw new RuntimeException(s"update failed: $policy $counts")
-      }
-    }
-    Await.result(future, 10 second)
-  }
-
-  def delete(policy: Counter, keys: Seq[ExactKeyTrait]): Unit = {
-
-  }
-
-  private def toIncrementCountRequests(policy: Counter,
-                                       counts: Seq[(ExactKeyTrait, 
ExactValueMap)])
-  : Seq[(ExactKeyTrait, ExactQualifier, JsValue)] = {
-    val labelName = policy.action + labelPostfix
-    val timestamp = System.currentTimeMillis()
-    for {
-      (exactKey, values) <- counts
-      (eq, value) <- values
-    } yield {
-      val from = exactKey.itemKey
-      val to = eq.dimension
-      val json = Json.obj(
-        "timestamp" -> timestamp,
-        "operation" -> "incrementCount",
-        "from" -> from,
-        "to" -> to,
-        "label" -> labelName,
-        "props" -> Json.obj(
-          "_count" -> value,
-          "time_unit" -> eq.tq.q.toString,
-          "time_value" -> eq.tq.ts
-        )
-      )
-      (exactKey, eq, json)
-    }
-  }
-
-  override def get(policy: Counter,
-                   items: Seq[String],
-                   timeRange: Seq[(TimedQualifier, TimedQualifier)],
-                   dimQuery: Map[String, Set[String]])
-                  (implicit ex: ExecutionContext): 
Future[Seq[FetchedCountsGrouped]] = {
-    val labelName = policy.action + labelPostfix
-    val label = Label.findByName(labelName).get
-//    val label = labelModel.findByName(labelName).get
-
-    val ids = Json.toJson(items)
-
-    val dimensions = {
-      for {
-        values <- CartesianProduct(dimQuery.values.map(ss => ss.toList).toList)
-      } yield {
-        dimQuery.keys.zip(values).toMap
-      }
-    }
-
-    val stepJsLs = {
-      for {
-        (tqFrom, tqTo) <- timeRange
-        dimension <- dimensions
-      } yield {
-        val eqFrom = ExactQualifier(tqFrom, dimension)
-        val eqTo = ExactQualifier(tqTo, dimension)
-        val intervalJs =
-          s"""
-            |{
-            |  "from": {
-            |    "_to": "${eqFrom.dimension}",
-            |    "time_unit": "${eqFrom.tq.q}",
-            |    "time_value": ${eqFrom.tq.ts}
-            |  },
-            |  "to": {
-            |    "_to": "${eqTo.dimension}",
-            |    "time_unit": "${eqTo.tq.q}",
-            |    "time_value": ${eqTo.tq.ts + 1}
-            |  }
-            |}
-          """.stripMargin
-        val stepJs =
-          s"""
-            |{
-            |  "direction": "out",
-            |  "limit": -1,
-            |  "duplicate": "raw",
-            |  "label": "$labelName",
-            |  "interval": $intervalJs
-            |}
-           """.stripMargin
-        stepJs
-      }
-    }
-
-    val reqJsStr =
-      s"""
-        |{
-        |  "srcVertices": [
-        |    {"serviceName": "${policy.service}", "columnName": 
"${label.srcColumnName}", "ids": $ids}
-        |  ],
-        |  "steps": [
-        |    {
-        |      "step": [
-        |        ${stepJsLs.mkString(",")}
-        |      ]
-        |    }
-        |  ]
-        |}
-      """.stripMargin
-
-    val reqJs = Json.parse(reqJsStr)
-//    log.warn(s"query: ${reqJs.toString()}")
-
-    wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(reqJs).map { 
resp =>
-      resp.status match {
-        case HttpStatus.SC_OK =>
-          val respJs = resp.json
-//          println(respJs)
-          val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { 
result =>
-//            println(s"result: $result")
-            resultToExactKeyValues(policy, result)
-          }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap.groupBy { case 
(eq, v) => (eq.tq.q, eq.dimKeyValues) })
-          for {
-            (k, v) <- keyWithValues.toSeq
-          } yield {
-            FetchedCountsGrouped(k, v)
-          }
-        case n: Int =>
-          log.warn(s"getEdges status($n): $reqJsStr")
-//          println(s"getEdges status($n): $reqJsStr")
-          Nil
-      }
-    }
-  }
-
-  private def resultToExactKeyValues(policy: Counter, result: JsValue): 
(ExactKeyTrait, (ExactQualifier, Long)) = {
-    val from = result \ "from" match {
-      case s: JsString => s.as[String]
-      case n: JsNumber => n.as[Long].toString
-      case x: JsValue => throw new RuntimeException(s"$x's type must be string 
or number")
-    }
-    val dimension = (result \ "to").as[String]
-    val props = result \ "props"
-    val count = (props \ "_count").as[Long]
-    val timeUnit = (props \ "time_unit").as[String]
-    val timeValue = (props \ "time_value").as[Long]
-    (ExactKey(policy, from, checkItemType = true), 
(ExactQualifier(TimedQualifier(timeUnit, timeValue), dimension), count))
-  }
-
-  private def getInner(policy: Counter, key: ExactKeyTrait, eqs: 
Seq[ExactQualifier])
-                      (implicit ex: ExecutionContext): 
Future[Seq[FetchedCounts]] = {
-    val labelName = policy.action + labelPostfix
-
-    Label.findByName(labelName) match {
-      case Some(label) =>
-
-        val src = Json.obj("serviceName" -> policy.service, "columnName" -> 
label.srcColumnName, "id" -> key.itemKey)
-        val step = {
-          val stepLs = {
-            for {
-              eq <- eqs
-            } yield {
-              val from = Json.obj("_to" -> eq.dimension, "time_unit" -> 
eq.tq.q.toString, "time_value" -> eq.tq.ts)
-              val to = Json.obj("_to" -> eq.dimension, "time_unit" -> 
eq.tq.q.toString, "time_value" -> eq.tq.ts)
-              val interval = Json.obj("from" -> from, "to" -> to)
-              Json.obj("limit" -> 1, "label" -> labelName, "interval" -> 
interval)
-            }
-          }
-          Json.obj("step" -> stepLs)
-        }
-        val query = Json.obj("srcVertices" -> Json.arr(src), "steps" -> 
Json.arr(step))
-        //    println(s"query: ${query.toString()}")
-
-        wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(query).map { 
resp =>
-          resp.status match {
-            case HttpStatus.SC_OK =>
-              val respJs = resp.json
-              val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { 
result =>
-                resultToExactKeyValues(policy, result)
-              }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap)
-              for {
-                (key, eqWithValues) <- keyWithValues.toSeq
-              } yield {
-                FetchedCounts(key, eqWithValues)
-              }
-            case _ =>
-              Nil
-          }
-        }
-      case None =>
-        Future.successful(Nil)
-    }
-  }
-
-  // for query exact qualifier
-  override def get(policy: Counter, queries: Seq[(ExactKeyTrait, 
Seq[ExactQualifier])])
-                  (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
-    val futures = {
-      for {
-        (key, eqs) <- queries
-      } yield {
-//        println(s"$key $eqs")
-        getInner(policy, key, eqs)
-      }
-    }
-    Future.sequence(futures).map(_.flatten)
-  }
-
-  override def getBlobValue(policy: Counter, blobId: String): Option[String] = 
{
-    throw new RuntimeException("unsupported getBlobValue operation")
-  }
-
-  override def insertBlobValue(policy: Counter, keys: Seq[BlobExactKey]): 
Seq[Boolean] = {
-    throw new RuntimeException("unsupported insertBlobValue operation")
-  }
-
-  private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean 
= {
-    val action = policy.action
-    val counterLabelName = action + labelPostfix
-
-    Label.findByName(counterLabelName, useCache).nonEmpty
-  }
-
-  override def prepare(policy: Counter): Unit = {
-    val service = policy.service
-    val action = policy.action
-
-    val graphLabel = Label.findByName(action)
-    if (graphLabel.isEmpty) {
-      throw new Exception(s"label not found. $service.$action")
-    }
-
-    if (!existsLabel(policy, useCache = false)) {
-      val label = Label.findByName(action, useCache = false).get
-
-      val counterLabelName = action + labelPostfix
-      val defaultJson =
-        s"""
-           |{
-           |  "label": "$counterLabelName",
-           |  "srcServiceName": "$service",
-           |  "srcColumnName": "${label.tgtColumnName}",
-           |  "srcColumnType": "${label.tgtColumnType}",
-           |  "tgtServiceName": "$SERVICE_NAME",
-           |  "tgtColumnName": "$COLUMN_NAME",
-           |  "tgtColumnType": "string",
-           |  "indices": [
-           |    {"name": "time", "propNames": ["_to", "time_unit", 
"time_value"]}
-           |  ],
-           |  "props": [
-           |    {"name": "time_unit", "dataType": "string", "defaultValue": 
""},
-           |    {"name": "time_value", "dataType": "long", "defaultValue": 0}
-           |  ],
-           |  "hTableName": "${policy.hbaseTable.get}"
-           |}
-        """.stripMargin
-      val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match {
-        case Some(ttl) =>
-          Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> 
Json.toJson(ttl))
-        case None =>
-          Json.parse(defaultJson)
-      }
-
-      graphOp.createLabel(json)
-    }
-  }
-
-  override def destroy(policy: Counter): Unit = {
-    val action = policy.action
-
-    if (existsLabel(policy, useCache = false)) {
-      val counterLabelName = action + labelPostfix
-
-      graphOp.deleteLabel(counterLabelName)
-    }
-  }
-
-  override def ready(policy: Counter): Boolean = {
-    existsLabel(policy)
-  }
-
-  // for range query
-  override def get(policy: Counter, items: Seq[String], timeRange: 
Seq[(TimedQualifier, TimedQualifier)])
-                  (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] 
= {
-    throw new NotImplementedError("Not implemented")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/counter/core/v2/GraphOperation.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/s2/counter/core/v2/GraphOperation.scala 
b/s2counter_core/src/main/scala/s2/counter/core/v2/GraphOperation.scala
deleted file mode 100644
index 0f43a52..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/v2/GraphOperation.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-package s2.counter.core.v2
-
-import com.typesafe.config.Config
-import org.apache.http.HttpStatus
-import org.slf4j.LoggerFactory
-import play.api.libs.json.{JsObject, JsValue, Json}
-import s2.config.S2CounterConfig
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-/**
-  * Created by hsleep([email protected]) on 2015. 11. 10..
-  */
-class GraphOperation(config: Config) {
-  // using play-ws without play app
-  private val builder = new 
com.ning.http.client.AsyncHttpClientConfig.Builder()
-  private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build)
-  private val s2config = new S2CounterConfig(config)
-  val s2graphUrl = s2config.GRAPH_URL
-  private[counter] val log = LoggerFactory.getLogger(this.getClass)
-
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  def createLabel(json: JsValue): Boolean = {
-    // fix counter label's schemaVersion
-    val newJson = json.as[JsObject] ++ Json.obj("schemaVersion" -> "v2")
-    val future = 
wsClient.url(s"$s2graphUrl/graphs/createLabel").post(newJson).map { resp =>
-      resp.status match {
-        case HttpStatus.SC_OK =>
-          true
-        case _ =>
-          throw new RuntimeException(s"failed createLabel. errCode: 
${resp.status} body: ${resp.body} query: $json")
-      }
-    }
-
-    Await.result(future, 10 second)
-  }
-
-  def deleteLabel(label: String): Boolean = {
-    val future = 
wsClient.url(s"$s2graphUrl/graphs/deleteLabel/$label").put("").map { resp =>
-      resp.status match {
-        case HttpStatus.SC_OK =>
-          true
-        case _ =>
-          throw new RuntimeException(s"failed deleteLabel. errCode: 
${resp.status} body: ${resp.body}")
-      }
-    }
-
-    Await.result(future, 10 second)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala 
b/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
deleted file mode 100644
index 18d7eda..0000000
--- a/s2counter_core/src/main/scala/s2/counter/core/v2/RankingStorageGraph.scala
+++ /dev/null
@@ -1,399 +0,0 @@
-package s2.counter.core.v2
-
-import com.kakao.s2graph.core.GraphUtil
-import com.kakao.s2graph.core.mysqls.Label
-import com.typesafe.config.Config
-import org.apache.commons.httpclient.HttpStatus
-import org.slf4j.LoggerFactory
-import play.api.libs.json.{JsObject, JsString, JsValue, Json}
-import s2.config.S2CounterConfig
-import s2.counter.core.RankingCounter.RankingValueMap
-import s2.counter.core.{RankingKey, RankingResult, RankingStorage}
-import s2.models.{Counter, CounterModel}
-import s2.util.{CollectionCache, CollectionCacheConfig}
-
-import scala.concurrent.duration._
-import scala.concurrent.{Await, Future}
-import scala.util.hashing.MurmurHash3
-
-/**
- * Created by shon on 7/28/15.
- */
-object RankingStorageGraph {
-  // using play-ws without play app
-  private val builder = new 
com.ning.http.client.AsyncHttpClientConfig.Builder()
-  private val wsClient = new play.api.libs.ws.ning.NingWSClient(builder.build)
-}
-
-class RankingStorageGraph(config: Config) extends RankingStorage {
-  import RankingStorageGraph._
-
-  private[counter] val log = LoggerFactory.getLogger(this.getClass)
-  private val s2config = new S2CounterConfig(config)
-
-  private val BUCKET_SHARD_COUNT = 53
-  private val SERVICE_NAME = "s2counter"
-  private val BUCKET_COLUMN_NAME = "bucket"
-  private val counterModel = new CounterModel(config)
-  private val labelPostfix = "_topK"
-
-  val s2graphUrl = s2config.GRAPH_URL
-  val s2graphReadOnlyUrl = s2config.GRAPH_READONLY_URL
-
-  val prepareCache = new 
CollectionCache[Option[Boolean]](CollectionCacheConfig(10000, 600))
-  val graphOp = new GraphOperation(config)
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  private def makeBucketKey(rankingKey: RankingKey): String = {
-    val eq = rankingKey.eq
-    val tq = eq.tq
-    s"${tq.q}.${tq.ts}.${eq.dimension}"
-  }
-
-  // "", "age.32", "age.gender.32.M"
-  private def makeBucketShardKey(shardIdx: Int, rankingKey: RankingKey): 
String = {
-    s"$shardIdx.${makeBucketKey(rankingKey)}"
-  }
-
-  /**
-   * indexProps: ["time_unit", "time_value", "score"]
-   */
-  override def getTopK(key: RankingKey, k: Int): Option[RankingResult] = {
-    getTopK(Seq(key), k).headOption.map(_._2)
-  }
-
-  override def getTopK(keys: Seq[RankingKey], k: Int): Seq[(RankingKey, 
RankingResult)] = {
-    val futures = for {
-      key <- keys
-    } yield {
-      getEdges(key).map { edges =>
-        key -> RankingResult(0d, toWithScoreLs(edges).take(k))
-      }
-    }
-
-    Await.result(Future.sequence(futures), 10 seconds)
-  }
-
-  override def update(key: RankingKey, value: RankingValueMap, k: Int): Unit = 
{
-    update(Seq((key, value)), k)
-  }
-
-  override def update(values: Seq[(RankingKey, RankingValueMap)], k: Int): 
Unit = {
-    val futures = {
-      for {
-        (key, value) <- values
-      } yield {
-        // prepare dimension bucket edge
-        checkAndPrepareDimensionBucket(key)
-
-        val future = getEdges(key, "raw").flatMap { edges =>
-          val prevRankingSeq = toWithScoreLs(edges)
-          val prevRankingMap: Map[String, Double] = 
prevRankingSeq.groupBy(_._1).map(_._2.sortBy(-_._2).head)
-          val currentRankingMap: Map[String, Double] = value.mapValues(_.score)
-          val mergedRankingSeq = (prevRankingMap ++ 
currentRankingMap).toSeq.sortBy(-_._2).take(k)
-          val mergedRankingMap = mergedRankingSeq.toMap
-
-          val bucketRankingSeq = mergedRankingSeq.groupBy { case (itemId, 
score) =>
-            // 0-index
-            GraphUtil.transformHash(MurmurHash3.stringHash(itemId)) % 
BUCKET_SHARD_COUNT
-          }.map { case (shardIdx, groupedRanking) =>
-            shardIdx -> groupedRanking.filter { case (itemId, _) => 
currentRankingMap.contains(itemId) }
-          }.toSeq
-
-          insertBulk(key, bucketRankingSeq).flatMap { _ =>
-            val duplicatedItems = prevRankingMap.filterKeys(s => 
currentRankingMap.contains(s))
-            val cutoffItems = prevRankingMap.filterKeys(s => 
!mergedRankingMap.contains(s))
-            val deleteItems = duplicatedItems ++ cutoffItems
-
-            val keyWithEdgesLs = prevRankingSeq.map(_._1).zip(edges)
-            val deleteEdges = keyWithEdgesLs.filter{ case (s, _) => 
deleteItems.contains(s) }.map(_._2)
-
-            deleteAll(deleteEdges)
-          }
-        }
-
-        future
-      }
-    }
-
-    Await.result(Future.sequence(futures), 10 seconds)
-  }
-
-  private def toWithScoreLs(edges: List[JsValue]): List[(String, Double)] = {
-    for {
-      edgeJson <- edges
-      to = (edgeJson \ "to").as[JsValue]
-      score = (edgeJson \ "score").as[JsValue].toString().toDouble
-    } yield {
-      val toValue = to match {
-        case s: JsString => s.as[String]
-        case _ => to.toString()
-      }
-      toValue -> score
-    }
-  }
-
-  private def insertBulk(key: RankingKey, newRankingSeq: Seq[(Int, 
Seq[(String, Double)])]): Future[Boolean] = {
-    val labelName = counterModel.findById(key.policyId).get.action + 
labelPostfix
-    val timestamp: Long = System.currentTimeMillis
-    val payload = Json.toJson {
-      for {
-        (shardIdx, rankingSeq) <- newRankingSeq
-        (itemId, score) <- rankingSeq
-      } yield {
-        val srcId = makeBucketShardKey(shardIdx, key)
-        Json.obj(
-          "timestamp" -> timestamp,
-          "from" -> srcId,
-          "to" -> itemId,
-          "label" -> labelName,
-          "props" -> Json.obj(
-            "time_unit" -> key.eq.tq.q.toString,
-            "time_value" -> key.eq.tq.ts,
-            "date_time" -> key.eq.tq.dateTime,
-            "score" -> score
-          )
-        )
-      }
-    }
-
-    wsClient.url(s"$s2graphUrl/graphs/edges/insertBulk").post(payload).map { 
resp =>
-      resp.status match {
-        case HttpStatus.SC_OK =>
-          true
-        case _ =>
-          throw new RuntimeException(s"failed insertBulk. errCode: 
${resp.status}, body: ${resp.body}, query: $payload")
-      }
-    }
-  }
-
-  private def deleteAll(edges: List[JsValue]): Future[Boolean] = {
-    // /graphs/edges/delete
-    val futures = {
-      for {
-        groupedEdges <- edges.grouped(50)
-      } yield {
-        val payload = Json.toJson(groupedEdges)
-        wsClient.url(s"$s2graphUrl/graphs/edges/delete").post(payload).map { 
resp =>
-          resp.status match {
-            case HttpStatus.SC_OK =>
-              true
-            case _ =>
-              log.error(s"failed delete. errCode: ${resp.status}, body: 
${resp.body}, query: $payload")
-              false
-          }
-        }
-      }
-    }.toSeq
-
-    Future.sequence(futures).map { seq =>
-      seq.forall(x => x)
-    }
-  }
-
-  /** select and delete */
-  override def delete(key: RankingKey): Unit = {
-    val future = getEdges(key).flatMap { edges =>
-      deleteAll(edges)
-    }
-    Await.result(future, 10 second)
-  }
-
-  private def getEdges(key: RankingKey, duplicate: String="first"): 
Future[List[JsValue]] = {
-    val labelName = counterModel.findById(key.policyId).get.action + 
labelPostfix
-
-    val ids = {
-      (0 until BUCKET_SHARD_COUNT).map { shardIdx =>
-        s""""${makeBucketShardKey(shardIdx, key)}""""
-      }
-    }.mkString(",")
-
-    val strJs =
-      s"""
-         |{
-         |    "srcVertices": [
-         |        {
-         |            "serviceName": "$SERVICE_NAME",
-         |            "columnName": "$BUCKET_COLUMN_NAME",
-         |            "ids": [$ids]
-         |        }
-         |    ],
-         |    "steps": [
-         |        {
-         |            "step": [
-         |                {
-         |                    "label": "$labelName",
-         |                    "duplicate": "$duplicate",
-         |                    "direction": "out",
-         |                    "offset": 0,
-         |                    "limit": -1,
-         |                    "interval": {
-         |                      "from": {"time_unit": 
"${key.eq.tq.q.toString}", "time_value": ${key.eq.tq.ts}},
-         |                      "to": {"time_unit": "${key.eq.tq.q.toString}", 
"time_value": ${key.eq.tq.ts}}
-         |                    },
-         |                    "scoring": {"score": 1}
-         |                }
-         |            ]
-         |        }
-         |    ]
-         |}
-       """.stripMargin
-    log.debug(strJs)
-
-    val payload = Json.parse(strJs)
-    wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(payload).map { 
resp =>
-      resp.status match {
-        case HttpStatus.SC_OK =>
-          (resp.json \ "results").asOpt[List[JsValue]].getOrElse(Nil)
-        case _ =>
-          throw new RuntimeException(s"failed getEdges. errCode: 
${resp.status}, body: ${resp.body}, query: $payload")
-      }
-    }
-  }
-
-  private def existsLabel(policy: Counter, useCache: Boolean = true): Boolean 
= {
-    val action = policy.action
-    val counterLabelName = action + labelPostfix
-
-    Label.findByName(counterLabelName, useCache).nonEmpty
-  }
-
-  private def checkAndPrepareDimensionBucket(rankingKey: RankingKey): Boolean 
= {
-    val dimension = rankingKey.eq.dimension
-    val bucketKey = makeBucketKey(rankingKey)
-    val labelName = "s2counter_topK_bucket"
-
-    val prepared = prepareCache.withCache(s"$dimension:$bucketKey") {
-      val checkReqJs = Json.arr(
-        Json.obj(
-          "label" -> labelName,
-          "direction" -> "out",
-          "from" -> dimension,
-          "to" -> makeBucketShardKey(BUCKET_SHARD_COUNT - 1, rankingKey)
-        )
-      )
-
-      val future = 
wsClient.url(s"$s2graphReadOnlyUrl/graphs/checkEdges").post(checkReqJs).map { 
resp =>
-        resp.status match {
-          case HttpStatus.SC_OK =>
-            val checkRespJs = resp.json
-            if (checkRespJs.as[Seq[JsValue]].nonEmpty) {
-              true
-            } else {
-              false
-            }
-          case _ =>
-            // throw exception
-            throw new Exception(s"failed checkEdges. ${resp.body} 
${resp.status}")
-        }
-      }.flatMap {
-        case true => Future.successful(Some(true))
-        case false =>
-          val insertReqJsLs = {
-            for {
-              i <- 0 until BUCKET_SHARD_COUNT
-            } yield {
-              Json.obj(
-                "timestamp" -> rankingKey.eq.tq.ts,
-                "from" -> dimension,
-                "to" -> makeBucketShardKey(i, rankingKey),
-                "label" -> labelName,
-                "props" -> Json.obj(
-                  "time_unit" -> rankingKey.eq.tq.q.toString,
-                  "date_time" -> rankingKey.eq.tq.dateTime
-                )
-              )
-            }
-          }
-          
wsClient.url(s"$s2graphUrl/graphs/edges/insert").post(Json.toJson(insertReqJsLs)).map
 { resp =>
-            resp.status match {
-              case HttpStatus.SC_OK =>
-                Some(true)
-              case _ =>
-                // throw exception
-                throw new Exception(s"failed insertEdges. ${resp.body} 
${resp.status}")
-            }
-          }
-      }.recover {
-        case e: Exception =>
-          None
-      }
-
-      Await.result(future, 10 second)
-    }
-    prepared.getOrElse(false)
-  }
-
-  override def prepare(policy: Counter): Unit = {
-    val service = policy.service
-    val action = policy.action
-
-    val graphLabel = {
-      policy.rateActionId match {
-        case Some(rateId) =>
-          counterModel.findById(rateId, useCache = false).flatMap { ratePolicy 
=>
-            Label.findByName(ratePolicy.action)
-          }
-        case None =>
-          Label.findByName(action)
-      }
-    }
-    if (graphLabel.isEmpty) {
-      throw new Exception(s"label not found. $service.$action")
-    }
-
-    if (!existsLabel(policy, useCache = false)) {
-      // find input label to specify target column
-      val inputLabelName = policy.rateActionId.flatMap { id =>
-        counterModel.findById(id, useCache = false).map(_.action)
-      }.getOrElse(action)
-      val label = graphLabel.get
-
-      val counterLabelName = action + labelPostfix
-      val defaultJson =
-        s"""
-           |{
-           |  "label": "$counterLabelName",
-           |  "srcServiceName": "$SERVICE_NAME",
-           |  "srcColumnName": "$BUCKET_COLUMN_NAME",
-           |  "srcColumnType": "string",
-           |  "tgtServiceName": "$service",
-           |  "tgtColumnName": "${label.tgtColumnName}",
-           |  "tgtColumnType": "${label.tgtColumnType}",
-           |  "indices": [
-           |    {"name": "time", "propNames": ["time_unit", "time_value", 
"score"]}
-           |  ],
-           |  "props": [
-           |    {"name": "time_unit", "dataType": "string", "defaultValue": 
""},
-           |    {"name": "time_value", "dataType": "long", "defaultValue": 0},
-           |    {"name": "date_time", "dataType": "long", "defaultValue": 0},
-           |    {"name": "score", "dataType": "float", "defaultValue": 0.0}
-           |  ],
-           |  "hTableName": "${policy.hbaseTable.get}"
-           |}
-         """.stripMargin
-      val json = policy.dailyTtl.map(ttl => ttl * 24 * 60 * 60) match {
-        case Some(ttl) =>
-          Json.parse(defaultJson).as[JsObject] + ("hTableTTL" -> 
Json.toJson(ttl))
-        case None =>
-          Json.parse(defaultJson)
-      }
-
-      graphOp.createLabel(json)
-    }
-  }
-
-  override def destroy(policy: Counter): Unit = {
-    val action = policy.action
-
-    if (existsLabel(policy, useCache = false)) {
-      val counterLabelName = action + labelPostfix
-
-      graphOp.deleteLabel(counterLabelName)
-    }
-  }
-
-  override def ready(policy: Counter): Boolean = {
-    existsLabel(policy)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/counter/decay/DecayFormula.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/decay/DecayFormula.scala 
b/s2counter_core/src/main/scala/s2/counter/decay/DecayFormula.scala
deleted file mode 100644
index 66e1b93..0000000
--- a/s2counter_core/src/main/scala/s2/counter/decay/DecayFormula.scala
+++ /dev/null
@@ -1,9 +0,0 @@
-package s2.counter.decay
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 26..
- */
-trait DecayFormula {
-  def apply(value: Double, millis: Long): Double
-  def apply(value: Double, seconds: Int): Double
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/counter/decay/ExpDecayFormula.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/s2/counter/decay/ExpDecayFormula.scala 
b/s2counter_core/src/main/scala/s2/counter/decay/ExpDecayFormula.scala
deleted file mode 100644
index 6de9f69..0000000
--- a/s2counter_core/src/main/scala/s2/counter/decay/ExpDecayFormula.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package s2.counter.decay
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 26..
- */
-case class ExpDecayFormula(halfLifeInMillis: Double) extends DecayFormula {
-  val decayRate = - Math.log(2) / halfLifeInMillis
-
-  override def apply(value: Double, millis: Long): Double = {
-    value * Math.pow(Math.E, decayRate * millis)
-  }
-
-  override def apply(value: Double, seconds: Int): Double = {
-    apply(value, seconds * 1000L)
-  }
-}
-
-object ExpDecayFormula {
-  @deprecated("do not use. just experimental", "0.14")
-  def byWindowTime(windowInMillis: Long, pct: Double): ExpDecayFormula = {
-    val halfLife = windowInMillis * Math.log(0.5) / Math.log(pct)
-    ExpDecayFormula(halfLife)
-  }
-
-  def byMeanLifeTime(millis: Long): ExpDecayFormula = {
-    ExpDecayFormula(millis * Math.log(2))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/counter/package.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/package.scala 
b/s2counter_core/src/main/scala/s2/counter/package.scala
deleted file mode 100644
index 40a9e41..0000000
--- a/s2counter_core/src/main/scala/s2/counter/package.scala
+++ /dev/null
@@ -1,11 +0,0 @@
-package s2
-
-/**
- * Created by hsleep([email protected]) on 15. 5. 22..
- */
-package object counter {
-  val VERSION_1: Byte = 1
-  val VERSION_2: Byte = 2
-
-  case class MethodNotSupportedException(message: String, cause: Throwable = 
null) extends Exception(message, cause)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala 
b/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
deleted file mode 100644
index 3cf9181..0000000
--- a/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-package s2.helper
-
-import com.kakao.s2graph.core.Graph
-import com.kakao.s2graph.core.mysqls.Label
-import com.typesafe.config.Config
-import play.api.libs.json.Json
-import s2.config.S2CounterConfig
-import s2.counter.core.v1.{ExactStorageAsyncHBase, RankingStorageRedis}
-import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, 
RankingStorageGraph}
-import s2.counter.core.{ExactCounter, RankingCounter}
-import s2.models.{Counter, CounterModel}
-
-import scala.util.Try
-
-/**
-  * Created by hsleep([email protected]) on 2015. 11. 11..
-  */
-class CounterAdmin(config: Config) {
-  val s2config = new S2CounterConfig(config)
-  val counterModel = new CounterModel(config)
-  val graphOp = new GraphOperation(config)
-  val s2graph = new Graph(config)(scala.concurrent.ExecutionContext.global)
-  val storageManagement = new com.kakao.s2graph.core.Management(s2graph)
-
-  def setupCounterOnGraph(): Unit = {
-    // create s2counter service
-    val service = "s2counter"
-    storageManagement.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, 
s"$service-${config.getString("phase")}", 1, None, "gz")
-    // create bucket label
-    val label = "s2counter_topK_bucket"
-    if (Label.findByName(label, useCache = false).isEmpty) {
-      val strJs =
-        s"""
-           |{
-           |  "label": "$label",
-           |  "srcServiceName": "s2counter",
-           |  "srcColumnName": "dimension",
-           |  "srcColumnType": "string",
-           |  "tgtServiceName": "s2counter",
-           |  "tgtColumnName": "bucket",
-           |  "tgtColumnType": "string",
-           |  "indices": [
-           |    {"name": "time", "propNames": ["time_unit", "date_time"]}
-           |   ],
-           |   "props": [
-           |      {"name": "time_unit", "dataType": "string", "defaultValue": 
""},
-           |      {"name": "date_time", "dataType": "long", "defaultValue": 0}
-           |   ],
-           |  "hTableName": "s2counter_60",
-           |  "hTableTTL": 5184000
-           |}
-        """.stripMargin
-      graphOp.createLabel(Json.parse(strJs))
-    }
-  }
-
-  def createCounter(policy: Counter): Unit = {
-    val newPolicy = policy.copy(hbaseTable = Some(makeHTableName(policy)))
-    prepareStorage(newPolicy)
-    counterModel.createServiceAction(newPolicy)
-  }
-
-  def deleteCounter(service: String, action: String): Option[Try[Unit]] = {
-    for {
-      policy <- counterModel.findByServiceAction(service, action, useCache = 
false)
-    } yield {
-      Try {
-        exactCounter(policy).destroy(policy)
-        if (policy.useRank) {
-          rankingCounter(policy).destroy(policy)
-        }
-        counterModel.deleteServiceAction(policy)
-      }
-    }
-  }
-
-  def prepareStorage(policy: Counter): Unit = {
-    if (policy.rateActionId.isEmpty) {
-      // if defined rate action, do not use exact counter
-      exactCounter(policy).prepare(policy)
-    }
-    if (policy.useRank) {
-      rankingCounter(policy).prepare(policy)
-    }
-  }
-
-  def prepareStorage(policy: Counter, version: Byte): Unit = {
-    // this function to prepare storage by version parameter instead of 
policy.version
-    prepareStorage(policy.copy(version = version))
-  }
-
-  private val exactCounterMap = Map(
-    s2.counter.VERSION_1 -> new ExactCounter(config, new 
ExactStorageAsyncHBase(config)),
-    s2.counter.VERSION_2 -> new ExactCounter(config, new 
ExactStorageGraph(config))
-  )
-  private val rankingCounterMap = Map(
-    s2.counter.VERSION_1 -> new RankingCounter(config, new 
RankingStorageRedis(config)),
-    s2.counter.VERSION_2 -> new RankingCounter(config, new 
RankingStorageGraph(config))
-  )
-
-  private val tablePrefixMap = Map (
-    s2.counter.VERSION_1 -> "s2counter",
-    s2.counter.VERSION_2 -> "s2counter_v2"
-  )
-
-  def exactCounter(version: Byte): ExactCounter = exactCounterMap(version)
-  def exactCounter(policy: Counter): ExactCounter = 
exactCounter(policy.version)
-  def rankingCounter(version: Byte): RankingCounter = 
rankingCounterMap(version)
-  def rankingCounter(policy: Counter): RankingCounter = 
rankingCounter(policy.version)
-
-  def makeHTableName(policy: Counter): String = {
-    Seq(tablePrefixMap(policy.version), policy.service, policy.ttl) ++ 
policy.dailyTtl mkString "_"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/helper/DistributedScanner.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/helper/DistributedScanner.scala 
b/s2counter_core/src/main/scala/s2/helper/DistributedScanner.scala
deleted file mode 100644
index dcf4d03..0000000
--- a/s2counter_core/src/main/scala/s2/helper/DistributedScanner.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-package s2.helper
-
-import java.util
-import java.util.Comparator
-
-import com.google.common.primitives.SignedBytes
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * Created by hsleep([email protected]) on 15. 5. 21..
- */
-
-object DistributedScanner {
-  val BUCKET_BYTE_SIZE = Bytes.SIZEOF_BYTE
-
-  def getRealRowKey(result: Result): Array[Byte] = {
-    result.getRow.drop(BUCKET_BYTE_SIZE)
-  }
-}
-
-class DistributedScanner(table: Table, scan: Scan) extends 
AbstractClientScanner {
-  import DistributedScanner._
-
-  private val BYTE_MAX = BigInt(256)
-
-  private[helper] val scanners = {
-    for {
-      i <- 0 until BYTE_MAX.pow(BUCKET_BYTE_SIZE).toInt
-    } yield {
-      val bucketBytes: Array[Byte] = 
Bytes.toBytes(i).takeRight(BUCKET_BYTE_SIZE)
-      val newScan = new Scan(scan).setStartRow(bucketBytes ++ 
scan.getStartRow).setStopRow(bucketBytes ++ scan.getStopRow)
-      table.getScanner(newScan)
-    }
-  }
-
-  val resultCache = new util.TreeMap[Result, java.util.Iterator[Result]](new 
Comparator[Result] {
-    val comparator = SignedBytes.lexicographicalComparator()
-    override def compare(o1: Result, o2: Result): Int = {
-      comparator.compare(getRealRowKey(o1), getRealRowKey(o2))
-    }
-  })
-
-  lazy val initialized = {
-    val iterators = scanners.map(_.iterator()).filter(_.hasNext)
-    iterators.foreach { it =>
-      resultCache.put(it.next(), it)
-    }
-    iterators.nonEmpty
-  }
-
-  override def next(): Result = {
-    if (initialized) {
-      Option(resultCache.pollFirstEntry()).map { entry =>
-        val it = entry.getValue
-        if (it.hasNext) {
-          // fill cache
-          resultCache.put(it.next(), it)
-        }
-        entry.getKey
-      }.orNull
-    } else {
-      null
-    }
-  }
-
-  override def close(): Unit = {
-    for {
-      scanner <- scanners
-    } {
-      scanner.close()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/helper/HashShardingJedis.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/helper/HashShardingJedis.scala 
b/s2counter_core/src/main/scala/s2/helper/HashShardingJedis.scala
deleted file mode 100644
index 395486d..0000000
--- a/s2counter_core/src/main/scala/s2/helper/HashShardingJedis.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-package s2.helper
-
-import com.typesafe.config.Config
-import org.slf4j.LoggerFactory
-import redis.clients.jedis.exceptions.JedisException
-import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
-import s2.config.S2CounterConfig
-import s2.util.Hashes
-
-/**
- * Created by jay on 14. 10. 31..
- */
-class HashShardingJedis(config: Config) {
-  lazy val s2config = new S2CounterConfig(config)
-
-  private val log = LoggerFactory.getLogger(getClass)
-
-  val poolConfig = new JedisPoolConfig()
-  poolConfig.setMaxTotal(150)
-  poolConfig.setMaxIdle(50)
-  poolConfig.setMaxWaitMillis(200)
-
-  val jedisPools = s2config.REDIS_INSTANCES.map { case (host, port) =>
-    new JedisPool(poolConfig, host, port)
-  }
-  val jedisPoolSize = jedisPools.size
-
-  def getJedisPool(idx: Int): JedisPool = {
-    if(idx >= jedisPoolSize)
-      null
-    else
-      jedisPools(idx)
-  }
-
-  def getJedisPoolWithBucketname2(bucket: String): JedisPool = {
-    val hashedValue = Hashes.murmur3(bucket)
-    val idx = hashedValue % jedisPoolSize
-    getJedisPool(idx)
-  }
-
-  def getJedisPoolWithBucketname(bucket: String): (JedisPool, JedisPool) = {
-    val hashedValue = Hashes.murmur3(bucket)
-    val idx = hashedValue % jedisPoolSize
-    val secondaryIdx = if (jedisPoolSize <= 1) {
-      throw new Exception("too small sharding pool <= 1")
-    } else {
-      val newIdx = (hashedValue / jedisPoolSize) % (jedisPoolSize -1)
-      if(newIdx < idx) {
-        newIdx
-      } else {
-        newIdx +1
-      }
-    }
-    (getJedisPool(idx), getJedisPool(secondaryIdx))
-  }
-
-  def doBlockWithJedisInstace(f : Jedis => Any, fallBack : => Any, jedis : 
Jedis) = {
-    try {
-      f(jedis)
-    }
-    catch {
-      case e:JedisException => {
-        fallBack
-      }
-    }
-  }
-
-  def doBlockWithBucketName(f : Jedis => Any, fallBack : => Any, bucket : 
String) = {
-//    Logger.debug(s"start jedis do block")
-    //val (jedis_pool1, jedis_pool2) = getJedisPoolWithBucketname(bucket)
-    val jedis_pool1= getJedisPoolWithBucketname2(bucket)
-//    if(jedis_pool1 != null && jedis_pool2 != null) {
-    if(jedis_pool1 != null) {
-      var jedis1: Jedis = null
-//      var jedis2: Jedis = null
-      try {
-        jedis1 = jedis_pool1.getResource()
-//        jedis2 = jedis_pool2.getResource()
-        log.info(s">> Jedis Pool Active Num : ${jedis_pool1.getNumActive}")
-
-        /* val f1 = Future(f(jedis1))
-        val f2 = Future(f(jedis2))
-
-        val mixedFuture = Future.sequence(List(f1,f2))   */
-
-        val r1 = f(jedis1)
-        //val r2 = f(jedis2)
-
-        r1
-      }
-      catch {
-        case e:JedisException => {
-//          Logger.debug(s"following exception catched")
-//          Logger.debug(s"${e}")
-          jedis_pool1.returnBrokenResource(jedis1)
-//          jedis_pool2.returnBrokenResource(jedis2)
-
-          jedis1 = null
-//          jedis2 = null
-          fallBack
-        }
-      }
-      finally {
-        if (jedis1 != null) jedis_pool1.returnResource(jedis1)
-//        if (jedis2 != null) jedis_pool2.returnResource(jedis2)
-      }
-    }
-    else{
-//      Logger.debug(s"fallback executed")
-      fallBack
-    }
-  }
-  
-  def doBlockWithKey[T](key: String)(f: Jedis => T)(fallBack: => T) = {
-//    Logger.debug(s"start jedis do block")
-    val (jedis_pool1, jedis_pool2) = getJedisPoolWithBucketname(key)
-    if(jedis_pool1 != null && jedis_pool2 != null) {
-      var jedis1: Jedis = null
-      var jedis2: Jedis = null
-      try {
-        jedis1 = jedis_pool1.getResource()
-        jedis2 = jedis_pool2.getResource()
-
-        /* val f1 = Future(f(jedis1))
-        val f2 = Future(f(jedis2))
-
-        val mixedFuture = Future.sequence(List(f1,f2))   */
-
-        val r1 = f(jedis1)
-        //val r2 = f(jedis2)
-
-        r1
-      }
-      catch {
-        case e:JedisException => {
-//          Logger.debug(s"following exception catched")
-//          Logger.debug(s"${e}")
-          jedis_pool1.returnBrokenResource(jedis1)
-          jedis_pool2.returnBrokenResource(jedis2)
-
-          jedis1 = null
-          jedis2 = null
-          fallBack
-        }
-      }
-      finally {
-        if (jedis1 != null) jedis_pool1.returnResource(jedis1)
-        if (jedis2 != null) jedis_pool2.returnResource(jedis2)
-      }
-    }
-    else{
-//      Logger.debug(s"fallback executed")
-      fallBack
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/helper/Management.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/helper/Management.scala 
b/s2counter_core/src/main/scala/s2/helper/Management.scala
deleted file mode 100644
index 266caba..0000000
--- a/s2counter_core/src/main/scala/s2/helper/Management.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-package s2.helper
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.client.{Admin, ConnectionFactory, Durability}
-import org.apache.hadoop.hbase.io.compress.Compression
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.regionserver.BloomType
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
-import org.slf4j.LoggerFactory
-import redis.clients.jedis.ScanParams
-
-import scala.collection.JavaConversions._
-import scala.util.Random
-
-/**
-*  Created by hsleep([email protected]) on 15. 3. 30..
-*/
-class Management(config: Config) {
-  val withRedis = new HashShardingJedis(config)
-
-  val log = LoggerFactory.getLogger(this.getClass)
-  
-  def describe(zkAddr: String, tableName: String) = {
-    val admin = getAdmin(zkAddr)
-    val table = admin.getTableDescriptor(TableName.valueOf(tableName))
-
-    table.getColumnFamilies.foreach { cf =>
-      println(s"columnFamily: ${cf.getNameAsString}")
-      cf.getValues.foreach { case (k, v) =>
-        println(s"${Bytes.toString(k.get())} ${Bytes.toString(v.get())}")
-      }
-    }
-  }
-  
-  def setTTL(zkAddr: String, tableName: String, cfName: String, ttl: Int) = {
-    val admin = getAdmin(zkAddr)
-    val tableNameObj = TableName.valueOf(tableName)
-    val table = admin.getTableDescriptor(tableNameObj)
-
-    val cf = table.getFamily(cfName.getBytes)
-    cf.setTimeToLive(ttl)
-
-    admin.modifyColumn(tableNameObj, cf)
-  }
-
-  def getAdmin(zkAddr: String): Admin = {
-    val conf = HBaseConfiguration.create()
-    conf.set("hbase.zookeeper.quorum", zkAddr)
-    val conn = ConnectionFactory.createConnection(conf)
-    conn.getAdmin
-  }
-
-  def tableExists(zkAddr: String, tableName: String): Boolean = {
-    getAdmin(zkAddr).tableExists(TableName.valueOf(tableName))
-  }
-
-  def createTable(zkAddr: String, tableName: String, cfs: List[String], 
regionMultiplier: Int) = {
-    log.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier")
-    val admin = getAdmin(zkAddr)
-    val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
-    try {
-      val desc = new HTableDescriptor(TableName.valueOf(tableName))
-      desc.setDurability(Durability.ASYNC_WAL)
-      for (cf <- cfs) {
-        val columnDesc = new HColumnDescriptor(cf)
-          .setCompressionType(Compression.Algorithm.LZ4)
-          .setBloomFilterType(BloomType.ROW)
-          .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
-          .setMaxVersions(1)
-          .setMinVersions(0)
-          .setBlocksize(32768)
-          .setBlockCacheEnabled(true)
-        desc.addFamily(columnDesc)
-      }
-
-      if (regionCount <= 1) admin.createTable(desc)
-      else admin.createTable(desc, getStartKey(regionCount), 
getEndKey(regionCount), regionCount)
-    } catch {
-      case e: Throwable =>
-        log.error(s"$zkAddr, $tableName failed with $e", e)
-        throw e
-    }
-  }
-
-  // we only use murmur hash to distribute row key.
-  private def getStartKey(regionCount: Int) = {
-    Bytes.toBytes(Int.MaxValue / regionCount)
-  }
-
-  private def getEndKey(regionCount: Int) = {
-    Bytes.toBytes(Int.MaxValue / regionCount * (regionCount - 1))
-  }
-
-  case class RedisScanIterator(scanParams: ScanParams = new 
ScanParams().count(100)) extends Iterator[String] {
-    val nextCursorId: collection.mutable.Map[Int, String] = 
collection.mutable.Map.empty[Int, String]
-    var innerIterator: Iterator[String] = _
-
-    for {
-      i <- 0 until withRedis.jedisPoolSize
-    } {
-      nextCursorId.put(i, "0")
-    }
-
-    def callScan(): Unit = {
-      if (nextCursorId.nonEmpty) {
-//        println(s"callScan: idx: $nextIdx, cursor: $nextCursorId")
-        val idx = Random.shuffle(nextCursorId.keys).head
-        val cursorId = nextCursorId(idx)
-        val pool = withRedis.getJedisPool(idx)
-        val conn = pool.getResource
-        try {
-          val result = conn.scan(cursorId, scanParams)
-          result.getStringCursor match {
-            case "0" =>
-              log.debug(s"end scan: idx: $idx, cursor: $cursorId")
-              nextCursorId.remove(idx)
-            case x: String =>
-              nextCursorId.put(idx, x)
-          }
-          innerIterator = result.getResult.toIterator
-        } finally {
-          pool.returnResource(conn)
-        }
-      }
-      else {
-        innerIterator = List.empty[String].toIterator
-      }
-    }
-
-    // initialize
-    callScan()
-
-    override def hasNext: Boolean = {
-      innerIterator.hasNext match {
-        case true =>
-          true
-        case false =>
-          callScan()
-          innerIterator.hasNext
-      }
-    }
-
-    override def next(): String = innerIterator.next()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/helper/WithHBase.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/helper/WithHBase.scala 
b/s2counter_core/src/main/scala/s2/helper/WithHBase.scala
deleted file mode 100644
index ae3ff1e..0000000
--- a/s2counter_core/src/main/scala/s2/helper/WithHBase.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-package s2.helper
-
-import com.stumbleupon.async.{Callback, Deferred}
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
-import org.hbase.async.HBaseClient
-import org.slf4j.LoggerFactory
-import s2.config.S2CounterConfig
-
-import scala.concurrent.{Future, Promise}
-import scala.util.Try
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 19..
- */
-class WithHBase(config: Config) {
-  lazy val logger = LoggerFactory.getLogger(this.getClass)
-  lazy val s2config = new S2CounterConfig(config)
-
-  lazy val zkQuorum = s2config.HBASE_ZOOKEEPER_QUORUM
-  lazy val defaultTableName = s2config.HBASE_TABLE_NAME
-
-  logger.info(s"$zkQuorum, $defaultTableName")
-
-  val hbaseConfig = HBaseConfiguration.create()
-  s2config.getConfigMap("hbase").foreach { case (k, v) =>
-    hbaseConfig.set(k, v)
-  }
-
-//  lazy val conn: HConnection = 
HConnectionManager.createConnection(hbaseConfig)
-  lazy val conn: Connection = ConnectionFactory.createConnection(hbaseConfig)
-
-  val writeBufferSize = 1024 * 1024 * 2   // 2MB
-
-//  def apply[T](op: Table => T): Try[T] = {
-//    Try {
-//      val table = conn.getTable(TableName.valueOf(defaultTableName))
-//      // do not keep failed operation in writer buffer
-//      table.setWriteBufferSize(writeBufferSize)
-//      try {
-//        op(table)
-//      } catch {
-//        case e: Throwable =>
-//          logger.error(s"Operation to table($defaultTableName) is failed: 
${e.getMessage}")
-//          throw e
-//      } finally {
-//        table.close()
-//      }
-//    }
-//  }
-  
-  def apply[T](tableName: String)(op: Table => T): Try[T] = {
-    Try {
-      val table = conn.getTable(TableName.valueOf(tableName))
-      // do not keep failed operation in writer buffer
-      table.setWriteBufferSize(writeBufferSize)
-      try {
-        op(table)
-      } catch {
-        case ex: Exception =>
-          logger.error(s"$ex: Operation to table($tableName) is failed")
-          throw ex
-      } finally {
-        table.close()
-      }
-    }
-  }
-}
-
-case class WithAsyncHBase(config: Config) {
-  lazy val logger = LoggerFactory.getLogger(this.getClass)
-  lazy val s2config = new S2CounterConfig(config)
-
-  lazy val zkQuorum = s2config.HBASE_ZOOKEEPER_QUORUM
-
-  val hbaseConfig = HBaseConfiguration.create()
-  s2config.getConfigMap("hbase").foreach { case (k, v) =>
-    hbaseConfig.set(k, v)
-  }
-
-//  lazy val conn: HConnection = 
HConnectionManager.createConnection(hbaseConfig)
-  lazy val client: HBaseClient = new HBaseClient(zkQuorum)
-
-  val writeBufferSize = 1024 * 1024 * 2   // 2MB
-
-  def apply[T](op: HBaseClient => Deferred[T]): Future[T] = {
-    val promise = Promise[T]()
-
-    op(client).addCallback(new Callback[Unit, T] {
-      def call(arg: T): Unit = {
-        promise.success(arg)
-      }
-    }).addErrback(new Callback[Unit, Exception] {
-      def call(ex: Exception): Unit = {
-        promise.failure(ex)
-      }
-    })
-    promise.future
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/helper/WithRedis.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/helper/WithRedis.scala 
b/s2counter_core/src/main/scala/s2/helper/WithRedis.scala
deleted file mode 100644
index 2046577..0000000
--- a/s2counter_core/src/main/scala/s2/helper/WithRedis.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-package s2.helper
-
-import com.typesafe.config.Config
-import org.slf4j.LoggerFactory
-import redis.clients.jedis.exceptions.JedisException
-import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
-import s2.config.S2CounterConfig
-import s2.util.Hashes
-
-import scala.util.Try
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 19..
- */
-class WithRedis(config: Config) {
-  lazy val s2config = new S2CounterConfig(config)
-
-  private val log = LoggerFactory.getLogger(getClass)
-
-  val poolConfig = new JedisPoolConfig()
-  poolConfig.setMaxTotal(150)
-  poolConfig.setMaxIdle(50)
-  poolConfig.setMaxWaitMillis(200)
-
-  val jedisPools = s2config.REDIS_INSTANCES.map { case (host, port) =>
-    new JedisPool(poolConfig, host, port)
-  }
-
-  def getBucketIdx(key: String): Int = {
-    Hashes.murmur3(key) % jedisPools.size
-  }
-
-  def doBlockWithIndex[T](idx: Int)(f: Jedis => T): Try[T] = {
-    Try {
-      val pool = jedisPools(idx)
-
-      var jedis: Jedis = null
-
-      try {
-        jedis = pool.getResource
-
-        f(jedis)
-      }
-      catch {
-        case e: JedisException =>
-          pool.returnBrokenResource(jedis)
-
-          jedis = null
-          throw e
-      }
-      finally {
-        if (jedis != null) {
-          pool.returnResource(jedis)
-        }
-      }
-    }
-  }
-
-  def doBlockWithKey[T](key: String)(f: Jedis => T): Try[T] = {
-    doBlockWithIndex(getBucketIdx(key))(f)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/models/CachedDBModel.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/models/CachedDBModel.scala 
b/s2counter_core/src/main/scala/s2/models/CachedDBModel.scala
deleted file mode 100644
index 5da0265..0000000
--- a/s2counter_core/src/main/scala/s2/models/CachedDBModel.scala
+++ /dev/null
@@ -1,14 +0,0 @@
-package s2.models
-
-import s2.util.{CollectionCache, CollectionCacheConfig}
-import scalikejdbc.AutoSession
-
-/**
- * Created by hsleep([email protected]) on 15. 5. 27..
- */
-trait CachedDBModel[T] {
-  implicit val s = AutoSession
-
-  val cacheConfig: CollectionCacheConfig
-  lazy val cache = new CollectionCache[Option[T]](cacheConfig)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/models/Counter.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/models/Counter.scala 
b/s2counter_core/src/main/scala/s2/models/Counter.scala
deleted file mode 100644
index e26e071..0000000
--- a/s2counter_core/src/main/scala/s2/models/Counter.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-package s2.models
-
-import com.typesafe.config.Config
-import s2.config.S2CounterConfig
-import s2.util.{CollectionCache, CollectionCacheConfig}
-import scalikejdbc._
-
-/**
-*  Created by hsleep([email protected]) on 15. 1. 30..
-*/
-case class Counter(id: Int, useFlag: Boolean, version: Byte, service: String, 
action: String,
-                   itemType: Counter.ItemType.ItemType, autoComb: Boolean, 
dimension: String,
-                   useProfile: Boolean, bucketImpId: Option[String],
-                   useRank: Boolean,
-                   ttl: Int, dailyTtl: Option[Int], hbaseTable: Option[String],
-                   intervalUnit: Option[String],
-                   rateActionId: Option[Int], rateBaseId: Option[Int], 
rateThreshold: Option[Int]) {
-  val intervals: Array[String] = intervalUnit.map(s => 
s.split(',')).getOrElse(Array("t", "M", "d", "H"))
-  val dimensionSp = if (dimension.isEmpty) Array.empty[String] else 
dimension.split(',').sorted
-
-  val dimensionList: List[Array[String]] = {
-    autoComb match {
-      case true =>
-        for {
-          i <- (0 to math.min(4, dimensionSp.length)).toList
-          combines <- dimensionSp.combinations(i)
-        } yield {
-          combines
-        }
-      case false =>
-        dimensionSp isEmpty match {
-          case true => List(Array())
-          case false => dimensionSp.toList.map(sp => sp.split('.'))
-        }
-    }
-  }
-
-  val dimensionSet: Set[Set[String]] = {
-    for {
-      arr <- dimensionList
-    } yield {
-      arr.toSet
-    }
-  }.toSet
-
-  val isRateCounter: Boolean = rateActionId.isDefined && rateBaseId.isDefined 
&& rateActionId != rateBaseId
-  val isTrendCounter: Boolean = rateActionId.isDefined && rateBaseId.isDefined 
&& rateActionId == rateBaseId
-}
-
-object Counter extends SQLSyntaxSupport[Counter] {
-  object ItemType extends Enumeration {
-    type ItemType = Value
-    val INT, LONG, STRING, BLOB = Value
-  }
-
-  def apply(c: SyntaxProvider[Counter])(rs: WrappedResultSet): Counter = 
apply(c.resultName)(rs)
-  def apply(r: ResultName[Counter])(rs: WrappedResultSet): Counter = {
-    lazy val itemType = Counter.ItemType(rs.int(r.itemType))
-    Counter(rs.int(r.id), rs.boolean(r.useFlag), rs.byte(r.version), 
rs.string(r.service), rs.string(r.action),
-      itemType, rs.boolean(r.autoComb), rs.string(r.dimension),
-      rs.boolean(r.useProfile), rs.stringOpt(r.bucketImpId),
-      rs.boolean(r.useRank),
-      rs.int(r.ttl), rs.intOpt(r.dailyTtl), rs.stringOpt(r.hbaseTable), 
rs.stringOpt(r.intervalUnit),
-      rs.intOpt(r.rateActionId), rs.intOpt(r.rateBaseId), 
rs.intOpt(r.rateThreshold))
-  }
-
-  def apply(useFlag: Boolean, version: Byte, service: String, action: String, 
itemType: Counter.ItemType.ItemType,
-            autoComb: Boolean, dimension: String, useProfile: Boolean = false, 
bucketImpId: Option[String] = None,
-            useRank: Boolean = false, ttl: Int = 259200, dailyTtl: Option[Int] 
= None,
-            hbaseTable: Option[String] = None, intervalUnit: Option[String] = 
None,
-            rateActionId: Option[Int] = None, rateBaseId: Option[Int] = None, 
rateThreshold: Option[Int] = None): Counter = {
-    Counter(-1, useFlag, version, service, action, itemType, autoComb, 
dimension,
-      useProfile, bucketImpId,
-      useRank, ttl, dailyTtl, hbaseTable,
-      intervalUnit, rateActionId, rateBaseId, rateThreshold)
-  }
-}
-
-class CounterModel(config: Config) extends CachedDBModel[Counter] {
-  private lazy val s2Config = new S2CounterConfig(config)
-  // enable negative cache
-  override val cacheConfig: CollectionCacheConfig =
-    new CollectionCacheConfig(s2Config.CACHE_MAX_SIZE, 
s2Config.CACHE_TTL_SECONDS,
-      negativeCache = true, s2Config.CACHE_NEGATIVE_TTL_SECONDS)
-
-  val c = Counter.syntax("c")
-  val r = c.result
-
-  val multiCache = new CollectionCache[Seq[Counter]](cacheConfig)
-
-  def findById(id: Int, useCache: Boolean = true): Option[Counter] = {
-    lazy val sql = withSQL {
-      selectFrom(Counter as c).where.eq(c.id, id).and.eq(c.useFlag, 1)
-    }.map(Counter(c))
-
-    if (useCache) {
-      cache.withCache(s"_id:$id") {
-        sql.single().apply()
-      }
-    } else {
-      sql.single().apply()
-    }
-  }
-
-  def findByServiceAction(service: String, action: String, useCache: Boolean = 
true): Option[Counter] = {
-    lazy val sql = withSQL {
-      selectFrom(Counter as c).where.eq(c.service, service).and.eq(c.action, 
action).and.eq(c.useFlag, 1)
-    }.map(Counter(c))
-
-    if (useCache) {
-      cache.withCache(s"$service.$action") {
-        sql.single().apply()
-      }
-    }
-    else {
-      sql.single().apply()
-    }
-  }
-
-  def findByRateActionId(rateActionId: Int, useCache: Boolean = true): 
Seq[Counter] = {
-    lazy val sql = withSQL {
-      selectFrom(Counter as c).where.eq(c.rateActionId, 
rateActionId).and.ne(c.rateBaseId, rateActionId).and.eq(c.useFlag, 1)
-    }.map(Counter(c))
-
-    if (useCache) {
-      multiCache.withCache(s"_rate_action_id.$rateActionId") {
-        sql.list().apply()
-      }
-    } else {
-      sql.list().apply()
-    }
-  }
-  
-  def findByRateBaseId(rateBaseId: Int, useCache: Boolean = true): 
Seq[Counter] = {
-    lazy val sql = withSQL {
-      selectFrom(Counter as c).where.eq(c.rateBaseId, 
rateBaseId).and.ne(c.rateActionId, rateBaseId).and.eq(c.useFlag, 1)
-    }.map(Counter(c))
-
-    if (useCache) {
-      multiCache.withCache(s"_rate_base_id.$rateBaseId") {
-        sql.list().apply()
-      }
-    } else {
-      sql.list().apply()
-    }
-  }
-
-  def findByTrendActionId(trendActionId: Int, useCache: Boolean = true): 
Seq[Counter] = {
-    lazy val sql = withSQL {
-      selectFrom(Counter as c).where.eq(c.rateActionId, 
trendActionId).and.eq(c.rateBaseId, trendActionId).and.eq(c.useFlag, 1)
-    }.map(Counter(c))
-
-    if (useCache) {
-      multiCache.withCache(s"_trend_action_id.$trendActionId") {
-        sql.list().apply()
-      }
-    } else {
-      sql.list().apply()
-    }
-  }
-
-  def createServiceAction(policy: Counter): Unit = {
-    withSQL {
-      val c = Counter.column
-      insert.into(Counter).namedValues(
-        c.useFlag -> policy.useFlag,
-        c.version -> policy.version,
-        c.service -> policy.service,
-        c.action -> policy.action,
-        c.itemType -> policy.itemType.id,
-        c.autoComb -> policy.autoComb,
-        c.dimension -> policy.dimension,
-        c.useProfile -> policy.useProfile,
-        c.bucketImpId -> policy.bucketImpId,
-        c.useRank -> policy.useRank,
-        c.ttl -> policy.ttl,
-        c.dailyTtl -> policy.dailyTtl,
-        c.hbaseTable -> policy.hbaseTable,
-        c.intervalUnit -> policy.intervalUnit,
-        c.rateActionId -> policy.rateActionId,
-        c.rateBaseId -> policy.rateBaseId,
-        c.rateThreshold -> policy.rateThreshold
-      )
-    }.update().apply()
-  }
-
-  def updateServiceAction(policy: Counter): Unit = {
-    withSQL {
-      val c = Counter.column
-      update(Counter).set(
-        c.autoComb -> policy.autoComb,
-        c.dimension -> policy.dimension,
-        c.useProfile -> policy.useProfile,
-        c.bucketImpId -> policy.bucketImpId,
-        c.useRank -> policy.useRank,
-        c.intervalUnit -> policy.intervalUnit,
-        c.rateActionId -> policy.rateActionId,
-        c.rateBaseId -> policy.rateBaseId,
-        c.rateThreshold -> policy.rateThreshold
-      ).where.eq(c.id, policy.id)
-    }.update().apply()
-  }
-
-  def deleteServiceAction(policy: Counter): Unit = {
-    withSQL {
-      val c = Counter.column
-      update(Counter).set(
-        c.action -> s"deleted_${System.currentTimeMillis()}_${policy.action}",
-        c.useFlag -> false
-      ).where.eq(c.id, policy.id)
-    }.update().apply()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/models/DBModel.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/models/DBModel.scala 
b/s2counter_core/src/main/scala/s2/models/DBModel.scala
deleted file mode 100644
index 6cb34b9..0000000
--- a/s2counter_core/src/main/scala/s2/models/DBModel.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package s2.models
-
-import com.typesafe.config.Config
-import s2.config.S2CounterConfig
-import scalikejdbc._
-
-/**
- * Created by alec on 15. 3. 31..
- */
-object DBModel {
-  private var initialized = false
-  
-  def initialize(config: Config): Unit = {
-    if (!initialized) {
-      this synchronized {
-        if (!initialized) {
-          val s2Config = new S2CounterConfig(config)
-          Class.forName(s2Config.DB_DEFAULT_DRIVER)
-          val settings = ConnectionPoolSettings(initialSize = 0, maxSize = 10, 
connectionTimeoutMillis = 5000L, validationQuery = "select 1;")
-
-          ConnectionPool.singleton(s2Config.DB_DEFAULT_URL, 
s2Config.DB_DEFAULT_USER, s2Config.DB_DEFAULT_PASSWORD, settings)
-          initialized = true
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/util/CartesianProduct.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/util/CartesianProduct.scala 
b/s2counter_core/src/main/scala/s2/util/CartesianProduct.scala
deleted file mode 100644
index 2077e3f..0000000
--- a/s2counter_core/src/main/scala/s2/util/CartesianProduct.scala
+++ /dev/null
@@ -1,11 +0,0 @@
-package s2.util
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 19..
- */
-object CartesianProduct {
-  def apply[T](xss: List[List[T]]): List[List[T]] = xss match {
-    case Nil => List(Nil)
-    case h :: t => for(xh <- h; xt <- apply(t)) yield xh :: xt
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/util/CollectionCache.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/util/CollectionCache.scala 
b/s2counter_core/src/main/scala/s2/util/CollectionCache.scala
deleted file mode 100644
index 122f87a..0000000
--- a/s2counter_core/src/main/scala/s2/util/CollectionCache.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-package s2.util
-
-import java.net.InetAddress
-import java.util.concurrent.TimeUnit
-
-import com.google.common.cache.{Cache, CacheBuilder}
-import org.slf4j.LoggerFactory
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.language.{postfixOps, reflectiveCalls}
-
-/**
- * Created by hsleep([email protected]) on 15. 7. 1..
- */
-case class CollectionCacheConfig(maxSize: Int, ttl: Int, negativeCache: 
Boolean = false, negativeTTL: Int = 600)
-
-class CollectionCache[C <: { def nonEmpty: Boolean; def isEmpty: Boolean } 
](config: CollectionCacheConfig) {
-  private val cache: Cache[String, C] = CacheBuilder.newBuilder()
-    .expireAfterWrite(config.ttl, TimeUnit.SECONDS)
-    .maximumSize(config.maxSize)
-    .build[String, C]()
-
-//  private lazy val cache = new SynchronizedLruMap[String, (C, 
Int)](config.maxSize)
-  private lazy val className = this.getClass.getSimpleName
-
-  private lazy val log = LoggerFactory.getLogger(this.getClass)
-  val localHostname = InetAddress.getLocalHost.getHostName
-
-  def size = cache.size
-  val maxSize = config.maxSize
-
-  // cache statistics
-  def getStatsString: String = {
-    s"$localHostname ${cache.stats().toString}"
-  }
-
-  def withCache(key: String)(op: => C): C = {
-    Option(cache.getIfPresent(key)) match {
-      case Some(r) => r
-      case None =>
-        val r = op
-        if (r.nonEmpty || config.negativeCache) {
-          cache.put(key, r)
-        }
-        r
-    }
-  }
-
-  def withCacheAsync(key: String)(op: => Future[C])(implicit ec: 
ExecutionContext): Future[C] = {
-    Option(cache.getIfPresent(key)) match {
-      case Some(r) => Future.successful(r)
-      case None =>
-        op.map { r =>
-          if (r.nonEmpty || config.negativeCache) {
-            cache.put(key, r)
-          }
-          r
-        }
-    }
-  }
-
-  def purgeKey(key: String) = {
-    cache.invalidate(key)
-  }
-
-  def contains(key: String): Boolean = {
-    Option(cache.getIfPresent(key)).nonEmpty
-  }
-}


Reply via email to