http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/util/FunctionParser.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/util/FunctionParser.scala 
b/s2counter_core/src/main/scala/s2/util/FunctionParser.scala
deleted file mode 100644
index 2454b0f..0000000
--- a/s2counter_core/src/main/scala/s2/util/FunctionParser.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package s2.util
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 29..
- */
-object FunctionParser {
-  val funcRe = """([a-zA-Z_]+)(\((\d+)?\))?""".r
-
-  def apply(str: String): Option[(String, String)] = {
-    str match {
-      case funcRe(funcName, funcParam, funcArg) =>
-        funcName match {
-          case x: String =>
-            Some((funcName, funcArg match {
-              case x: String => funcArg
-              case null => ""
-            }))
-          case null => None
-        }
-      case _ =>
-        None
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/util/Hashes.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/util/Hashes.scala 
b/s2counter_core/src/main/scala/s2/util/Hashes.scala
deleted file mode 100644
index 2edbcd8..0000000
--- a/s2counter_core/src/main/scala/s2/util/Hashes.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package s2.util
-
-import org.apache.hadoop.hbase.util.Bytes
-
-import scala.util.hashing.MurmurHash3
-
-/**
- * Created by hsleep([email protected]) on 15. 5. 27..
- */
-object Hashes {
-  def sha1(s: String): String = {
-    val md = java.security.MessageDigest.getInstance("SHA-1")
-    Bytes.toHex(md.digest(s.getBytes("UTF-8")))
-  }
-  
-  private def positiveHash(h: Int): Int = {
-    if (h < 0) -1 * (h + 1) else h
-  }
-
-  def murmur3(s: String): Int = {
-    val hash = MurmurHash3.stringHash(s)
-    positiveHash(hash)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/util/ReduceMapValue.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/util/ReduceMapValue.scala 
b/s2counter_core/src/main/scala/s2/util/ReduceMapValue.scala
deleted file mode 100644
index 9c13fa0..0000000
--- a/s2counter_core/src/main/scala/s2/util/ReduceMapValue.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package s2.util
-
-/**
- * Created by hsleep([email protected]) on 15. 7. 20..
- */
-class ReduceMapValue[T, U](op: (U, U) => U, default: U) {
-  def apply(m1: Map[T, U], m2: Map[T, U]): Map[T, U] = {
-    m1 ++ m2.map { case (k, v) =>
-      k -> op(m1.getOrElse(k, default), v)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/util/Retry.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/util/Retry.scala 
b/s2counter_core/src/main/scala/s2/util/Retry.scala
deleted file mode 100644
index d1f7213..0000000
--- a/s2counter_core/src/main/scala/s2/util/Retry.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-package s2.util
-
-import scala.annotation.tailrec
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success, Try}
-
-/**
- * Created by hsleep([email protected]) on 15. 1. 6..
- */
-object Retry {
-  @tailrec
-  def apply[T](n: Int, withSleep: Boolean = true, tryCount: Int = 0)(fn: => 
T): T = {
-    Try { fn } match {
-      case Success(x) => x
-      case Failure(e) if e.isInstanceOf[RetryStopException] => throw e.getCause
-      case _ if n > 1 =>
-        // backoff
-        if (withSleep) Thread.sleep(tryCount * 1000)
-        apply(n - 1, withSleep, tryCount + 1)(fn)
-      case Failure(e) => throw e
-    }
-  }
-}
-
-object RetryAsync {
-  def apply[T](n: Int, withSleep: Boolean = true, tryCount: Int = 0)(fn: => 
Future[T])(implicit ex: ExecutionContext): Future[T] = {
-    val promise = Promise[T]()
-    fn onComplete {
-      case Success(x) => promise.success(x)
-      case Failure(e) if e.isInstanceOf[RetryStopException] => 
promise.failure(e.getCause)
-      case _ if n > 1 =>
-        // backoff
-        if (withSleep) Thread.sleep(tryCount * 1000)
-        apply(n - 1, withSleep, tryCount + 1)(fn)
-      case Failure(e) => promise.failure(e)
-    }
-    promise.future
-  }
-}
-
-class RetryStopException(message: String, cause: Throwable)
-  extends Exception(message, cause) {
-
-  def this(message: String) = this(message, null)
-
-  def this(cause: Throwable) = this(cause.toString, cause)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/util/SplitBytes.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/util/SplitBytes.scala 
b/s2counter_core/src/main/scala/s2/util/SplitBytes.scala
deleted file mode 100644
index a5ca998..0000000
--- a/s2counter_core/src/main/scala/s2/util/SplitBytes.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package s2.util
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 12..
- */
-object SplitBytes {
-  def apply(bytes: Array[Byte], sizes: Seq[Int]): Seq[Array[Byte]] = {
-    if (sizes.sum > bytes.length) {
-      throw new Exception(s"sizes.sum bigger than bytes.length ${sizes.sum} > 
${bytes.length}} ")
-    }
-
-    var position = 0
-    val rtn = {
-      for {
-        size <- sizes
-      } yield {
-        val slice = bytes.slice(position, position + size)
-        position += size
-        slice
-      }
-    }
-    rtn ++ Seq(bytes.drop(position))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/main/scala/s2/util/UnitConverter.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/util/UnitConverter.scala 
b/s2counter_core/src/main/scala/s2/util/UnitConverter.scala
deleted file mode 100644
index fb0b0d0..0000000
--- a/s2counter_core/src/main/scala/s2/util/UnitConverter.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package s2.util
-
-/**
- * Created by hsleep([email protected]) on 15. 4. 3..
- */
-object UnitConverter {
-  def toMillis(ts: Int): Long = {
-    ts * 1000L
-  }
-
-  def toMillis(ts: Long): Long = {
-    if (ts <= Int.MaxValue) {
-      ts * 1000
-    } else {
-      ts
-    }
-  }
-
-  def toMillis(s: String): Long = {
-    toMillis(s.toLong)
-  }
-
-  def toHours(ts: Long): Long = {
-    toMillis(ts) / HOUR_MILLIS * HOUR_MILLIS
-  }
-
-  val HOUR_MILLIS = 60 * 60 * 1000
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
 
b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
new file mode 100644
index 0000000..1ae7b12
--- /dev/null
+++ 
b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
@@ -0,0 +1,166 @@
+package org.apache.s2graph.counter.core
+
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.{Graph, Management}
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
+import org.apache.s2graph.counter.core.v2.{GraphOperation, RankingStorageGraph}
+import org.apache.s2graph.counter.helper.CounterAdmin
+import org.apache.s2graph.counter.models.{Counter, CounterModel, DBModel}
+import org.apache.s2graph.counter.util.Retry
+import org.specs2.mutable.Specification
+import org.specs2.specification.BeforeAfterAll
+import play.api.libs.json.Json
+
+import scala.util.{Failure, Random, Success, Try}
+
+class RankingCounterSpec extends Specification with BeforeAfterAll {
+  val config = ConfigFactory.load()
+  DBModel.initialize(config)
+  val counterModel = new CounterModel(config)
+  val admin = new CounterAdmin(config)
+
+  val s2config = new S2CounterConfig(config)
+
+//  val rankingCounterV1 = new RankingCounter(config, new 
RankingStorageV1(config))
+//  val rankingCounterV2 = new RankingCounter(config, new 
RankingStorageV2(config))
+  val rankingCounterV2 = new RankingCounter(config, new 
RankingStorageGraph(config))
+
+//  "RankingCounterV1" >> {
+//    val policy = counterModel.findByServiceAction("test", "test_action", 
useCache = false).get
+//    val rankingKey = RankingKey(policy.id, policy.version, 
ExactQualifier(TimedQualifier(IntervalUnit.TOTAL, 0L), Map.empty[String, 
String]))
+//    "get top k" >> {
+//      val result = rankingCounterV1.getTopK(rankingKey, 100)
+//
+//      println(result)
+//
+//      result must not be empty
+//    }
+//
+//    "get and increment" >> {
+//      val result = rankingCounterV1.getTopK(rankingKey, 100).get
+//
+//      val value = 2d
+//      val contents = {
+//        for {
+//          (item, score) <- result.values
+//        } yield {
+//          item -> RankingValue(score + value, value)
+//        }
+//      }.toMap
+//      rankingCounterV1.update(rankingKey, contents, 100)
+//
+//      val result2 = rankingCounterV1.getTopK(rankingKey, 100).get
+//
+//      result2.totalScore must_== result.totalScore + 
contents.values.map(_.increment).sum
+//      result2.values must containTheSameElementsAs(result.values.map { case 
(k, v) => (k, v + value) })
+//    }
+//  }
+
+  val service = "test"
+  val action = "test_case"
+
+  override def beforeAll: Unit = {
+    Try {
+      Retry(3) {
+        admin.setupCounterOnGraph
+      }
+
+      val graphOp = new GraphOperation(config)
+      val graph = new Graph(config)(scala.concurrent.ExecutionContext.global)
+      val management = new Management(graph)
+      management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, 
s"${service}_dev", 1, None, "gz")
+      val strJs =
+        s"""
+           |{
+           |  "label": "$action",
+           |  "srcServiceName": "$service",
+           |  "srcColumnName": "src",
+           |  "srcColumnType": "string",
+           |  "tgtServiceName": "$service",
+           |  "tgtColumnName": "$action",
+           |  "tgtColumnType": "string",
+           |  "indices": [
+           |  ],
+           |  "props": [
+           |  ]
+           |}
+       """.stripMargin
+      Retry(3) {
+        if (Label.findByName(action).isEmpty) {
+          graphOp.createLabel(Json.parse(strJs))
+        }
+      }
+
+      admin.deleteCounter(service, action).foreach {
+        case Failure(ex) =>
+          println(s"$ex")
+          throw ex
+        case Success(v) =>
+      }
+      admin.createCounter(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING, autoComb = true, "", useRank = true))
+    } match {
+      case Failure(ex) =>
+        println(s"$ex")
+      case Success(_) =>
+    }
+  }
+
+  override def afterAll: Unit = {
+    admin.deleteCounter(service, action)
+  }
+
+  "RankingCounterV2" >> {
+    "get top k" >> {
+      val policy = counterModel.findByServiceAction(service, action, useCache 
= true).get
+
+      val rankingKey = RankingKey(policy.id, policy.version, 
ExactQualifier(TimedQualifier(IntervalUnit.TOTAL, 0L), Map.empty[String, 
String]))
+
+      val orgMap = Map(
+        "1" -> 1d,
+        "2" -> 2d,
+        "3" -> 3d,
+        "4" -> 4d,
+        "5" -> 5d,
+        "6" -> 6d,
+        "7" -> 7d,
+        "8" -> 8d,
+        "9" -> 9d,
+        "10" -> 10d,
+        "11" -> 11d,
+        "12" -> 12d,
+        "13" -> 13d,
+        "14" -> 14d,
+        "15" -> 15d,
+        "16" -> 16d,
+        "17" -> 17d,
+        "18" -> 18d,
+        "19" -> 19d,
+        "20" -> 20d,
+        "100" -> 100d
+      )
+
+      val valueMap = Random.shuffle(orgMap).toMap
+
+      val predictResult = valueMap.toSeq.sortBy(-_._2)
+
+      val rvMap = valueMap.map { case (k, score) =>
+        k -> RankingValue(score, 0)
+      }
+      Try {
+        rankingCounterV2.update(rankingKey, rvMap, 100)
+      }.isSuccess must_== true
+
+      Thread.sleep(1000)
+
+      val result : RankingResult = rankingCounterV2.getTopK(rankingKey, 10).get
+
+      println(result.values)
+
+      result must not be empty
+      result.values must have size 10
+      result.values must_== predictResult.take(10)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala
 
b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala
new file mode 100644
index 0000000..a52b8f9
--- /dev/null
+++ 
b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala
@@ -0,0 +1,50 @@
+package org.apache.s2graph.counter.models
+
+import com.typesafe.config.ConfigFactory
+import org.specs2.mutable.Specification
+
+class CounterModelSpec extends Specification {
+  val config = ConfigFactory.load()
+
+  DBModel.initialize(config)
+
+  "CounterModel" should {
+    val model = new CounterModel(config)
+    "findById" in {
+      model.findById(0, useCache = false) must beNone
+    }
+
+    "findByServiceAction using cache" in {
+      val service = "test"
+      val action = "test_action"
+      val counter = Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING,
+        autoComb = true, "", useProfile = true, None, useRank = true, 0, None, 
None, None, None, None, None)
+      model.createServiceAction(counter)
+      model.findByServiceAction(service, action, useCache = false) must beSome
+      val opt = model.findByServiceAction(service, action, useCache = true)
+      opt must beSome
+      model.findById(opt.get.id) must beSome
+      model.deleteServiceAction(opt.get)
+      model.findById(opt.get.id) must beSome
+      model.findById(opt.get.id, useCache = false) must beNone
+    }
+
+    "create and delete policy" in {
+      val (service, action) = ("test", "test_case")
+      for {
+        policy <- model.findByServiceAction(service, action, useCache = false)
+      } {
+        model.deleteServiceAction(policy)
+      }
+      model.createServiceAction(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING,
+        autoComb = true, "", useProfile = true, None, useRank = true, 0, None, 
None, None, None, None, None))
+      model.findByServiceAction(service, action, useCache = false).map { 
policy =>
+        policy.service mustEqual service
+        policy.action mustEqual action
+        model.deleteServiceAction(policy)
+        policy
+      } must beSome
+      model.findByServiceAction(service, action, useCache = false) must beNone
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSpec.scala
 
b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSpec.scala
new file mode 100644
index 0000000..220c30f
--- /dev/null
+++ 
b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSpec.scala
@@ -0,0 +1,33 @@
+package org.apache.s2graph.counter.models
+
+import org.apache.s2graph.counter.models.Counter.ItemType
+import org.specs2.mutable.Specification
+
+class CounterSpec extends Specification {
+  "Counter" should {
+    "dimension auto combination" in {
+      val policy = Counter(
+        useFlag = true,
+        2,
+        "test",
+        "test_case",
+        ItemType.LONG,
+        autoComb = true,
+        "p1,p2,p3",
+        useProfile = false,
+        None,
+        useRank = true,
+        0,
+        None,
+        None,
+        None,
+        None,
+        None,
+        None
+      )
+
+      policy.dimensionSp mustEqual Array("p1", "p2", "p3")
+      policy.dimensionList.map { arr => arr.toSeq }.toSet -- 
Set(Seq.empty[String], Seq("p1"), Seq("p2"), Seq("p3"), Seq("p1", "p2"), 
Seq("p1", "p3"), Seq("p2", "p3"), Seq("p1", "p2", "p3")) must beEmpty
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala 
b/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
deleted file mode 100644
index ea67a2a..0000000
--- a/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-package s2.counter.core
-
-import com.kakao.s2graph.core.{Management, Graph}
-import com.kakao.s2graph.core.mysqls.Label
-import com.typesafe.config.ConfigFactory
-import org.specs2.mutable.Specification
-import org.specs2.specification.BeforeAfterAll
-import play.api.libs.json.Json
-import s2.config.S2CounterConfig
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core.v2.{GraphOperation, RankingStorageGraph}
-import s2.helper.CounterAdmin
-import s2.models.{Counter, CounterModel, DBModel}
-import s2.util.Retry
-
-import scala.util.{Failure, Random, Success, Try}
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 19..
- */
-class RankingCounterSpec extends Specification with BeforeAfterAll {
-  val config = ConfigFactory.load()
-  DBModel.initialize(config)
-  val counterModel = new CounterModel(config)
-  val admin = new CounterAdmin(config)
-
-  val s2config = new S2CounterConfig(config)
-
-//  val rankingCounterV1 = new RankingCounter(config, new 
RankingStorageV1(config))
-//  val rankingCounterV2 = new RankingCounter(config, new 
RankingStorageV2(config))
-  val rankingCounterV2 = new RankingCounter(config, new 
RankingStorageGraph(config))
-
-//  "RankingCounterV1" >> {
-//    val policy = counterModel.findByServiceAction("test", "test_action", 
useCache = false).get
-//    val rankingKey = RankingKey(policy.id, policy.version, 
ExactQualifier(TimedQualifier(IntervalUnit.TOTAL, 0L), Map.empty[String, 
String]))
-//    "get top k" >> {
-//      val result = rankingCounterV1.getTopK(rankingKey, 100)
-//
-//      println(result)
-//
-//      result must not be empty
-//    }
-//
-//    "get and increment" >> {
-//      val result = rankingCounterV1.getTopK(rankingKey, 100).get
-//
-//      val value = 2d
-//      val contents = {
-//        for {
-//          (item, score) <- result.values
-//        } yield {
-//          item -> RankingValue(score + value, value)
-//        }
-//      }.toMap
-//      rankingCounterV1.update(rankingKey, contents, 100)
-//
-//      val result2 = rankingCounterV1.getTopK(rankingKey, 100).get
-//
-//      result2.totalScore must_== result.totalScore + 
contents.values.map(_.increment).sum
-//      result2.values must containTheSameElementsAs(result.values.map { case 
(k, v) => (k, v + value) })
-//    }
-//  }
-
-  val service = "test"
-  val action = "test_case"
-
-  override def beforeAll: Unit = {
-    Try {
-      Retry(3) {
-        admin.setupCounterOnGraph
-      }
-
-      val graphOp = new GraphOperation(config)
-      val graph = new Graph(config)(scala.concurrent.ExecutionContext.global)
-      val management = new Management(graph)
-      management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, 
s"${service}_dev", 1, None, "gz")
-      val strJs =
-        s"""
-           |{
-           |  "label": "$action",
-           |  "srcServiceName": "$service",
-           |  "srcColumnName": "src",
-           |  "srcColumnType": "string",
-           |  "tgtServiceName": "$service",
-           |  "tgtColumnName": "$action",
-           |  "tgtColumnType": "string",
-           |  "indices": [
-           |  ],
-           |  "props": [
-           |  ]
-           |}
-       """.stripMargin
-      Retry(3) {
-        if (Label.findByName(action).isEmpty) {
-          graphOp.createLabel(Json.parse(strJs))
-        }
-      }
-
-      admin.deleteCounter(service, action).foreach {
-        case Failure(ex) =>
-          println(s"$ex")
-          throw ex
-        case Success(v) =>
-      }
-      admin.createCounter(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING, autoComb = true, "", useRank = true))
-    } match {
-      case Failure(ex) =>
-        println(s"$ex")
-      case Success(_) =>
-    }
-  }
-
-  override def afterAll: Unit = {
-    admin.deleteCounter(service, action)
-  }
-
-  "RankingCounterV2" >> {
-    "get top k" >> {
-      val policy = counterModel.findByServiceAction(service, action, useCache 
= true).get
-
-      val rankingKey = RankingKey(policy.id, policy.version, 
ExactQualifier(TimedQualifier(IntervalUnit.TOTAL, 0L), Map.empty[String, 
String]))
-
-      val orgMap = Map(
-        "1" -> 1d,
-        "2" -> 2d,
-        "3" -> 3d,
-        "4" -> 4d,
-        "5" -> 5d,
-        "6" -> 6d,
-        "7" -> 7d,
-        "8" -> 8d,
-        "9" -> 9d,
-        "10" -> 10d,
-        "11" -> 11d,
-        "12" -> 12d,
-        "13" -> 13d,
-        "14" -> 14d,
-        "15" -> 15d,
-        "16" -> 16d,
-        "17" -> 17d,
-        "18" -> 18d,
-        "19" -> 19d,
-        "20" -> 20d,
-        "100" -> 100d
-      )
-
-      val valueMap = Random.shuffle(orgMap).toMap
-
-      val predictResult = valueMap.toSeq.sortBy(-_._2)
-
-      val rvMap = valueMap.map { case (k, score) =>
-        k -> RankingValue(score, 0)
-      }
-      Try {
-        rankingCounterV2.update(rankingKey, rvMap, 100)
-      }.isSuccess must_== true
-
-      Thread.sleep(1000)
-
-      val result : RankingResult = rankingCounterV2.getTopK(rankingKey, 10).get
-
-      println(result.values)
-
-      result must not be empty
-      result.values must have size 10
-      result.values must_== predictResult.take(10)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/test/scala/s2/models/CounterModelSpec.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/test/scala/s2/models/CounterModelSpec.scala 
b/s2counter_core/src/test/scala/s2/models/CounterModelSpec.scala
deleted file mode 100644
index dff55ef..0000000
--- a/s2counter_core/src/test/scala/s2/models/CounterModelSpec.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-package s2.models
-
-import com.typesafe.config.ConfigFactory
-import org.specs2.mutable.Specification
-
-/**
- * Created by hsleep([email protected]) on 15. 5. 26..
- */
-class CounterModelSpec extends Specification {
-  val config = ConfigFactory.load()
-
-  DBModel.initialize(config)
-
-  "CounterModel" should {
-    val model = new CounterModel(config)
-    "findById" in {
-      model.findById(0, useCache = false) must beNone
-    }
-
-    "findByServiceAction using cache" in {
-      val service = "test"
-      val action = "test_action"
-      val counter = Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING,
-        autoComb = true, "", useProfile = true, None, useRank = true, 0, None, 
None, None, None, None, None)
-      model.createServiceAction(counter)
-      model.findByServiceAction(service, action, useCache = false) must beSome
-      val opt = model.findByServiceAction(service, action, useCache = true)
-      opt must beSome
-      model.findById(opt.get.id) must beSome
-      model.deleteServiceAction(opt.get)
-      model.findById(opt.get.id) must beSome
-      model.findById(opt.get.id, useCache = false) must beNone
-    }
-
-    "create and delete policy" in {
-      val (service, action) = ("test", "test_case")
-      for {
-        policy <- model.findByServiceAction(service, action, useCache = false)
-      } {
-        model.deleteServiceAction(policy)
-      }
-      model.createServiceAction(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING,
-        autoComb = true, "", useProfile = true, None, useRank = true, 0, None, 
None, None, None, None, None))
-      model.findByServiceAction(service, action, useCache = false).map { 
policy =>
-        policy.service mustEqual service
-        policy.action mustEqual action
-        model.deleteServiceAction(policy)
-        policy
-      } must beSome
-      model.findByServiceAction(service, action, useCache = false) must beNone
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_core/src/test/scala/s2/models/CounterSpec.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/test/scala/s2/models/CounterSpec.scala 
b/s2counter_core/src/test/scala/s2/models/CounterSpec.scala
deleted file mode 100644
index a03c70e..0000000
--- a/s2counter_core/src/test/scala/s2/models/CounterSpec.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-package s2.models
-
-import org.specs2.mutable.Specification
-import s2.models.Counter.ItemType
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 11..
- */
-class CounterSpec extends Specification {
-  "Counter" should {
-    "dimension auto combination" in {
-      val policy = Counter(
-        useFlag = true,
-        2,
-        "test",
-        "test_case",
-        ItemType.LONG,
-        autoComb = true,
-        "p1,p2,p3",
-        useProfile = false,
-        None,
-        useRank = true,
-        0,
-        None,
-        None,
-        None,
-        None,
-        None,
-        None
-      )
-
-      policy.dimensionSp mustEqual Array("p1", "p2", "p3")
-      policy.dimensionList.map { arr => arr.toSeq }.toSet -- 
Set(Seq.empty[String], Seq("p1"), Seq("p2"), Seq("p3"), Seq("p1", "p2"), 
Seq("p1", "p3"), Seq("p2", "p3"), Seq("p1", "p2", "p3")) must beEmpty
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala
new file mode 100644
index 0000000..64da217
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala
@@ -0,0 +1,78 @@
+package org.apache.s2graph.counter.loader
+
+import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.core.BlobExactKey
+import org.apache.s2graph.counter.loader.config.StreamingConfig
+import org.apache.s2graph.counter.loader.core.{CounterFunctions, 
CounterEtlFunctions}
+import org.apache.s2graph.counter.models.Counter.ItemType
+import org.apache.s2graph.counter.models.{DBModel, CounterModel}
+import org.apache.s2graph.spark.config.S2ConfigFactory
+import org.apache.s2graph.spark.spark.{HashMapParam, SparkApp, WithKafka}
+import org.apache.spark.SparkContext
+
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.concurrent.ExecutionContext
+
+object CounterBulkLoader extends SparkApp with WithKafka {
+   lazy val config = S2ConfigFactory.config
+   lazy val s2Config = new S2CounterConfig(config)
+   lazy val counterModel = new CounterModel(config)
+   lazy val className = getClass.getName.stripSuffix("$")
+   lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
+
+   implicit val ec = ExecutionContext.Implicits.global
+
+   val initialize = {
+     println("initialize")
+ //    Graph(config)
+     DBModel.initialize(config)
+     true
+   }
+
+   override def run(): Unit = {
+     val hdfsPath = args(0)
+     val blockSize = args(1).toInt
+     val minPartitions = args(2).toInt
+     val conf = sparkConf(s"$hdfsPath: CounterBulkLoader")
+
+     val sc = new SparkContext(conf)
+     val acc = sc.accumulable(MutableHashMap.empty[String, Long], 
"Throughput")(HashMapParam[String, Long](_ + _))
+
+     val msgs = sc.textFile(hdfsPath)
+
+     val etlRdd = msgs.repartition(minPartitions).mapPartitions { part =>
+       // parse and etl
+       assert(initialize)
+       val items = {
+         for {
+           msg <- part
+           line <- GraphUtil.parseString(msg)
+           sp = GraphUtil.split(line) if sp.size <= 7 || 
GraphUtil.split(line)(7) != "in"
+           item <- CounterEtlFunctions.parseEdgeFormat(line)
+         } yield {
+           acc +=("Edges", 1)
+           item
+         }
+       }
+       items.grouped(blockSize).flatMap { grouped =>
+         grouped.groupBy(e => (e.service, e.action)).flatMap { case ((service, 
action), v) =>
+           CounterEtlFunctions.checkPolicyAndMergeDimension(service, action, 
v.toList)
+         }
+       }
+     }
+
+     val exactRdd = CounterFunctions.exactCountFromEtl(etlRdd, 
etlRdd.partitions.length)
+     val logRdd = exactRdd.mapPartitions { part =>
+       val seq = part.toSeq
+       CounterFunctions.insertBlobValue(seq.map(_._1).filter(_.itemType == 
ItemType.BLOB).map(_.asInstanceOf[BlobExactKey]), acc)
+       // update exact counter
+       CounterFunctions.updateExactCounter(seq, acc).toIterator
+     }
+
+     val rankRdd = CounterFunctions.makeRankingRddFromTrxLog(logRdd, 
logRdd.partitions.length)
+     rankRdd.foreachPartition { part =>
+       CounterFunctions.updateRankingCounter(part, acc)
+     }
+   }
+ }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/EraseDailyCounter.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/EraseDailyCounter.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/EraseDailyCounter.scala
new file mode 100644
index 0000000..8bdc5ba
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/EraseDailyCounter.scala
@@ -0,0 +1,133 @@
+package org.apache.s2graph.counter.loader
+
+import java.text.SimpleDateFormat
+
+import kafka.producer.KeyedMessage
+import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap
+import org.apache.s2graph.counter.core.v1.ExactStorageHBase
+import org.apache.s2graph.counter.core.v2.ExactStorageGraph
+import org.apache.s2graph.counter.core._
+import org.apache.s2graph.counter._
+import org.apache.s2graph.counter.loader.config.StreamingConfig
+import org.apache.s2graph.counter.loader.core.CounterEtlItem
+import org.apache.s2graph.counter.models.{CounterModel, DBModel, Counter}
+import org.apache.s2graph.spark.config.S2ConfigFactory
+import org.apache.s2graph.spark.spark.{SparkApp, WithKafka}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import play.api.libs.json.Json
+import scala.collection.mutable
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.concurrent.ExecutionContext
+
+object EraseDailyCounter extends SparkApp with WithKafka {
+   implicit val ec = ExecutionContext.Implicits.global
+
+
+   lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
+   def valueToEtlItem(policy: Counter, key: ExactKeyTrait, values: 
ExactValueMap): Seq[CounterEtlItem] = {
+     if (values.nonEmpty) {
+       for {
+         (eq, value) <- filter(values.toList)
+       } yield {
+         CounterEtlItem(eq.tq.ts, policy.service, policy.action, key.itemKey, 
Json.toJson(eq.dimKeyValues), Json.toJson(Map("value" -> -value)))
+       }
+     } else {
+       Nil
+     }
+   }
+
+   def filter(values: List[(ExactQualifier, Long)]): List[(ExactQualifier, 
Long)] = {
+     val sorted = values.sortBy(_._1.dimKeyValues.size).reverse
+     val (eq, value) = sorted.head
+     val dimKeys = eq.dimKeyValues.toSeq
+     val flat = {
+       for {
+         i <- 0 to dimKeys.length
+         comb <- dimKeys.combinations(i)
+       } yield {
+         ExactQualifier(eq.tq, comb.toMap) -> value
+       }
+     }.toMap
+
+ //    println("flat >>>", flat)
+
+     val valuesMap = values.toMap
+     val remain = (valuesMap ++ flat.map { case (k, v) =>
+       k -> (valuesMap(k) - v)
+     }).filter(_._2 > 0).toList
+
+ //    println("remain >>>", remain)
+
+     if (remain.isEmpty) {
+       List((eq, value))
+     } else {
+       (eq, value) :: filter(remain)
+     }
+   }
+
+   def produce(policy: Counter, exactRdd: RDD[(ExactKeyTrait, 
ExactValueMap)]): Unit = {
+     exactRdd.mapPartitions { part =>
+       for {
+         (key, values) <- part
+         item <- valueToEtlItem(policy, key, values)
+       } yield {
+         item
+       }
+     }.foreachPartition { part =>
+       val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]]
+       part.foreach { item =>
+         val k = getPartKey(item.item, 20)
+         val values = m.getOrElse(k, mutable.MutableList.empty[CounterEtlItem])
+         values += item
+         m.update(k, values)
+       }
+       m.foreach { case (k, v) =>
+         v.map(_.toKafkaMessage).grouped(1000).foreach { grouped =>
+ //          println(grouped)
+           producer.send(new KeyedMessage[String, 
String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n")))
+         }
+       }
+     }
+   }
+
+   def rddToExactRdd(policy: Counter, date: String, rdd: RDD[String]): 
RDD[(ExactKeyTrait, ExactValueMap)] = {
+     val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
+     val fromTs = dateFormat.parse(date).getTime
+     val toTs = fromTs + 23 * 60 * 60 * 1000
+
+     rdd.mapPartitions { part =>
+       val exactCounter = policy.version match {
+         case VERSION_1 => new ExactCounter(S2ConfigFactory.config, new 
ExactStorageHBase(S2ConfigFactory.config))
+         case VERSION_2 => new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
+       }
+
+       for {
+         line <- part
+         FetchedCounts(exactKey, qualifierWithCountMap) <- 
exactCounter.getCount(policy, line, Array(TimedQualifier.IntervalUnit.DAILY), 
fromTs, toTs)
+       } yield {
+         (exactKey, qualifierWithCountMap)
+       }
+     }
+   }
+
+   lazy val className = getClass.getName.stripSuffix("$")
+
+   override def run(): Unit = {
+     validateArgument("service", "action", "date", "file", "op")
+     DBModel.initialize(S2ConfigFactory.config)
+
+     val (service, action, date, file, op) = (args(0), args(1), args(2), 
args(3), args(4))
+     val conf = sparkConf(s"$className: $service.$action")
+
+     val ctx = new SparkContext(conf)
+
+     val rdd = ctx.textFile(file, 20)
+
+     val counterModel = new CounterModel(S2ConfigFactory.config)
+
+     val policy = counterModel.findByServiceAction(service, action).get
+     val exactRdd = rddToExactRdd(policy, date, rdd)
+     produce(policy, exactRdd)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/config/StreamingConfig.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/config/StreamingConfig.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/config/StreamingConfig.scala
new file mode 100644
index 0000000..8c9a8dd
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/config/StreamingConfig.scala
@@ -0,0 +1,24 @@
+package org.apache.s2graph.counter.loader.config
+
+import org.apache.s2graph.counter.config.ConfigFunctions
+import org.apache.s2graph.spark.config.S2ConfigFactory
+
+object StreamingConfig extends ConfigFunctions(S2ConfigFactory.config) {
+  // kafka
+  val KAFKA_ZOOKEEPER = getOrElse("kafka.zookeeper", "localhost")
+  val KAFKA_BROKERS = getOrElse("kafka.brokers", "localhost")
+  val KAFKA_TOPIC_GRAPH = getOrElse("kafka.topic.graph", "s2graphInalpha")
+  val KAFKA_TOPIC_ETL = getOrElse("kafka.topic.etl", "s2counter-etl-alpha")
+  val KAFKA_TOPIC_COUNTER = getOrElse("kafka.topic.counter", "s2counter-alpha")
+  val KAFKA_TOPIC_COUNTER_TRX = getOrElse("kafka.topic.counter-trx", 
"s2counter-trx-alpha")
+  val KAFKA_TOPIC_COUNTER_FAIL = getOrElse("kafka.topic.counter-fail", 
"s2counter-fail-alpha")
+
+  // profile cache
+  val PROFILE_CACHE_TTL_SECONDS = getOrElse("profile.cache.ttl.seconds", 60 * 
60 * 24)    // default 1 day
+  val PROFILE_CACHE_MAX_SIZE = getOrElse("profile.cache.max.size", 10000)
+  val PROFILE_PREFETCH_SIZE = getOrElse("profile.prefetch.size", 10)
+
+  // graph url
+  val GRAPH_URL = getOrElse("s2graph.url", "")
+  val GRAPH_READONLY_URL = getOrElse("s2graph.read-only.url", GRAPH_URL)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
new file mode 100644
index 0000000..3e80e25
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
@@ -0,0 +1,86 @@
+package org.apache.s2graph.counter.loader.core
+
+import org.apache.s2graph.core.{Edge, Graph, GraphUtil}
+import org.apache.s2graph.counter.loader.config.StreamingConfig
+import org.apache.s2graph.counter.models.CounterModel
+import org.apache.s2graph.spark.config.S2ConfigFactory
+import org.apache.spark.Logging
+import play.api.libs.json._
+import scala.collection.mutable.{HashMap => MutableHashMap}
+
+object CounterEtlFunctions extends Logging {
+  lazy val filterOps = Seq("insert", "insertBulk", "update", 
"increment").map(op => GraphUtil.operations(op))
+  lazy val preFetchSize = StreamingConfig.PROFILE_PREFETCH_SIZE
+  lazy val config = S2ConfigFactory.config
+  lazy val counterModel = new CounterModel(config)
+
+  def logToEdge(line: String): Option[Edge] = {
+    for {
+      elem <- Graph.toGraphElement(line) if elem.isInstanceOf[Edge]
+      edge <- Some(elem.asInstanceOf[Edge]).filter { x =>
+        filterOps.contains(x.op)
+      }
+    } yield {
+      edge
+    }
+  }
+
+  def parseEdgeFormat(line: String): Option[CounterEtlItem] = {
+    /**
+     * 1427082276804   insert  edge    19073318        
52453027_93524145648511699      story_user_ch_doc_view  {"doc_type" : "l", 
"channel_subscribing" : "y", "view_from" : "feed"}
+     */
+    for {
+      elem <- Graph.toGraphElement(line) if elem.isInstanceOf[Edge]
+      edge <- Some(elem.asInstanceOf[Edge]).filter { x =>
+        filterOps.contains(x.op)
+      }
+    } yield {
+      val label = edge.label
+      val labelName = label.label
+      val tgtService = label.tgtColumn.service.serviceName
+      val tgtId = edge.tgtVertex.innerId.toString()
+      val srcId = edge.srcVertex.innerId.toString()
+
+      // make empty property if no exist edge property
+      val dimension = Json.parse(Some(GraphUtil.split(line)).filter(_.length 
>= 7).map(_(6)).getOrElse("{}"))
+      val bucketKeys = Seq("_from")
+      val bucketKeyValues = {
+        for {
+          variable <- bucketKeys
+        } yield {
+          val jsValue = variable match {
+            case "_from" => JsString(srcId)
+            case s => dimension \ s
+          }
+          s"[[$variable]]" -> jsValue
+        }
+      }
+      val property = Json.toJson(bucketKeyValues :+ ("value" -> JsString("1")) 
toMap)
+//      val property = Json.toJson(Map("_from" -> srcId, "_to" -> tgtId, 
"value" -> "1"))
+
+      CounterEtlItem(edge.ts, tgtService, labelName, tgtId, dimension, 
property)
+    }
+  }
+
+  def parseEdgeFormat(lines: List[String]): List[CounterEtlItem] = {
+    for {
+      line <- lines
+      item <- parseEdgeFormat(line)
+    } yield {
+      item
+    }
+  }
+  
+  def checkPolicyAndMergeDimension(service: String, action: String, items: 
List[CounterEtlItem]): List[CounterEtlItem] = {
+    counterModel.findByServiceAction(service, action).map { policy =>
+      if (policy.useProfile) {
+        policy.bucketImpId match {
+          case Some(_) => DimensionProps.mergeDimension(policy, items)
+          case None => Nil
+        }
+      } else {
+        items
+      }
+    }.getOrElse(Nil)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala
new file mode 100644
index 0000000..7a1ebb7
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala
@@ -0,0 +1,39 @@
+package org.apache.s2graph.counter.loader.core
+
+import org.apache.s2graph.counter.util.UnitConverter
+import org.slf4j.LoggerFactory
+import play.api.libs.json._
+import scala.util.{Failure, Success, Try}
+
+case class CounterEtlItem(ts: Long, service: String, action: String, item: 
String, dimension: JsValue, property: JsValue, useProfile: Boolean = false) {
+   def toKafkaMessage: String = {
+     
s"$ts\t$service\t$action\t$item\t${dimension.toString()}\t${property.toString()}"
+   }
+
+   lazy val value = {
+     property \ "value" match {
+       case JsNumber(n) => n.longValue()
+       case JsString(s) => s.toLong
+       case _: JsUndefined => 1L
+       case _ => throw new Exception("wrong type")
+     }
+   }
+ }
+
+object CounterEtlItem {
+   val log = LoggerFactory.getLogger(this.getClass)
+
+   def apply(line: String): Option[CounterEtlItem] = {
+     Try {
+       val Array(ts, service, action, item, dimension, property) = 
line.split('\t')
+       CounterEtlItem(UnitConverter.toMillis(ts.toLong), service, action, 
item, Json.parse(dimension), Json.parse(property))
+     } match {
+       case Success(item) =>
+         Some(item)
+       case Failure(ex) =>
+         log.error(">>> failed")
+         log.error(s"${ex.toString}: $line")
+         None
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
new file mode 100644
index 0000000..59ac841
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala
@@ -0,0 +1,446 @@
+package org.apache.s2graph.counter.loader.core
+
+import kafka.producer.KeyedMessage
+import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.counter.TrxLog
+import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap
+import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap
+import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
+import org.apache.s2graph.counter.core._
+import org.apache.s2graph.counter.core.v2.{RankingStorageGraph, 
ExactStorageGraph}
+import org.apache.s2graph.counter.loader.config.StreamingConfig
+import org.apache.s2graph.counter.loader.models.DefaultCounterModel
+import org.apache.s2graph.counter.models.{Counter, DBModel}
+import org.apache.s2graph.spark.config.S2ConfigFactory
+import org.apache.s2graph.spark.spark.WithKafka
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{Accumulable, Logging}
+import play.api.libs.json.{JsNumber, JsString, JsValue, Json}
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.concurrent.ExecutionContext
+import scala.language.postfixOps
+import scala.util.Try
+
+object CounterFunctions extends Logging with WithKafka {
+
+  private val K_MAX = 500
+  implicit val ec = ExecutionContext.Implicits.global
+
+  val exactCounter = new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
+  val rankingCounter = new RankingCounter(S2ConfigFactory.config, new 
RankingStorageGraph(S2ConfigFactory.config))
+
+  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
+
+  type HashMapAccumulable = Accumulable[MutableHashMap[String, Long], (String, 
Long)]
+
+  val initialize = {
+    logInfo("initialize CounterFunctions")
+    DBModel.initialize(S2ConfigFactory.config)
+    true
+  }
+
+  def getCountValue(policy: Counter, item: CounterEtlItem): ExactValueMap = {
+    for {
+      dimKeys <- policy.dimensionList
+      dimValues <- getDimensionValues(item.dimension, dimKeys).toSeq
+      eq <- 
ExactQualifier.getQualifiers(policy.intervals.map(IntervalUnit.withName), 
item.ts, dimKeys.zip(dimValues).toMap)
+    } yield {
+      eq -> item.value
+    }
+  }.toMap
+
+  def getDimensionValues(dimension: JsValue, keys: Array[String]): 
Option[Array[String]] = {
+    Try {
+      for {
+        k <- keys
+        jsValue = dimension \ k
+      } yield {
+        jsValue match {
+          case JsNumber(n) => n.toString()
+          case JsString(s) => s
+          case _ => throw new Exception()
+        }
+      }
+    }.toOption
+  }
+  
+  def exactMapper(item: CounterEtlItem): Option[(ExactKeyTrait, 
ExactValueMap)] = {
+    DefaultCounterModel.findByServiceAction(item.service, item.action).map { 
policy =>
+      (ExactKey(policy, item.item, checkItemType = true), 
getCountValue(policy, item))
+    }
+  }
+
+  def rankingMapper(row: ItemRankingRow): Seq[(RankingKey, RankingValueMap)] = 
{
+    // normal ranking
+    for {
+      (eq, rv) <- row.value
+    } yield {
+      (RankingKey(row.key.policyId, row.key.version, eq), Map(row.key.itemKey 
-> rv))
+    }
+  }.toSeq
+
+  def logToRankValue(log: TrxLog): Option[(ExactKeyTrait, Map[ExactQualifier, 
RankingValue])] = {
+    DefaultCounterModel.findById(log.policyId).map { policy =>
+      val key = ExactKey(policy, log.item, checkItemType = false)
+      val value = {
+        for {
+          result <- log.results
+        } yield {
+          ExactQualifier(TimedQualifier(result.interval, result.ts), 
result.dimension) -> RankingValue(result.result, result.value)
+        }
+      }.toMap
+      key -> value
+    }
+  }
+
+  def reduceValue[T, U](op: (U, U) => U, default: U)(m1: Map[T, U], m2: Map[T, 
U]): Map[T, U] = {
+    m1 ++ m2.map { case (k, v) =>
+      k -> op(m1.getOrElse(k, default), v)
+    }
+  }
+
+  def makeExactRdd(rdd: RDD[(String, String)], numPartitions: Int): 
RDD[(ExactKeyTrait, ExactValueMap)] = {
+    rdd.mapPartitions { part =>
+      assert(initialize)
+      for {
+        (k, v) <- part
+        line <- GraphUtil.parseString(v)
+        item <- CounterEtlItem(line).toSeq
+        ev <- exactMapper(item).toSeq
+      } yield {
+        ev
+      }
+    }.reduceByKey(reduceValue[ExactQualifier, Long](_ + _, 0L)(_, _), 
numPartitions)
+  }
+
+  def makeRankingRdd(rdd: RDD[(String, String)], numPartitions: Int): 
RDD[(RankingKey, RankingValueMap)] = {
+    val logRdd = makeTrxLogRdd(rdd, numPartitions)
+    makeRankingRddFromTrxLog(logRdd, numPartitions)
+  }
+
+  def makeRankingRddFromTrxLog(rdd: RDD[TrxLog], numPartitions: Int): 
RDD[(RankingKey, RankingValueMap)] = {
+    val itemRankingRdd = makeItemRankingRdd(rdd, numPartitions).cache()
+    try {
+      rankingCount(itemRankingRdd, numPartitions) union
+        rateRankingCount(itemRankingRdd, numPartitions) union
+        trendRankingCount(itemRankingRdd, numPartitions) coalesce numPartitions
+    } finally {
+      itemRankingRdd.unpersist(false)
+    }
+  }
+
+  def makeTrxLogRdd(rdd: RDD[(String, String)], numPartitions: Int): 
RDD[TrxLog] = {
+    rdd.mapPartitions { part =>
+      assert(initialize)
+      for {
+        (k, v) <- part
+        line <- GraphUtil.parseString(v)
+        trxLog = Json.parse(line).as[TrxLog] if trxLog.success
+      } yield {
+        trxLog
+      }
+    }
+  }
+
+  def rankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): 
RDD[(RankingKey, RankingValueMap)] = {
+    rdd.mapPartitions { part =>
+      for {
+        row <- part
+        rv <- rankingMapper(row)
+      } yield {
+        rv
+      }
+    }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), 
numPartitions)
+  }
+
+  case class ItemRankingRow(key: ExactKeyTrait, value: Map[ExactQualifier, 
RankingValue])
+
+  def makeItemRankingRdd(rdd: RDD[TrxLog], numPartitions: Int): 
RDD[ItemRankingRow] = {
+    rdd.mapPartitions { part =>
+      for {
+        log <- part
+        rv <- logToRankValue(log)
+      } yield {
+        rv
+      }
+    }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), 
numPartitions).mapPartitions { part =>
+      for {
+        (key, value) <- part
+      } yield {
+        ItemRankingRow(key, value)
+      }
+    }
+  }
+
+  def mapTrendRankingValue(rows: Seq[ItemRankingRow]): Seq[(ExactKeyTrait, 
Map[ExactQualifier, RateRankingValue])] = {
+    for {
+      row <- rows
+      trendPolicy <- DefaultCounterModel.findByTrendActionId(row.key.policyId)
+    } yield {
+      val key = ExactKey(trendPolicy, row.key.itemKey, checkItemType = false)
+      val value = row.value.filter { case (eq, rv) =>
+        // eq filter by rate policy dimension
+        trendPolicy.dimensionSet.exists { dimSet => dimSet == 
eq.dimKeyValues.keys }
+      }.map { case (eq, rv) =>
+        eq -> RateRankingValue(rv.score, -1)
+      }
+      (key, value)
+    }
+  }
+
+  def mapRateRankingValue(rows: Seq[ItemRankingRow]): Seq[(ExactKeyTrait, 
Map[ExactQualifier, RateRankingValue])] = {
+    val actionPart = {
+      for {
+        row <- rows
+        ratePolicy <- DefaultCounterModel.findByRateActionId(row.key.policyId)
+      } yield {
+        val key = ExactKey(ratePolicy, row.key.itemKey, checkItemType = false)
+        val value = row.value.filter { case (eq, rv) =>
+          // eq filter by rate policy dimension
+          ratePolicy.dimensionSet.exists { dimSet => dimSet == 
eq.dimKeyValues.keys }
+        }.map { case (eq, rv) =>
+          eq -> RateRankingValue(rv.score, -1)
+        }
+        (key, value)
+      }
+    }
+
+    val basePart = {
+      for {
+        row <- rows
+        ratePolicy <- DefaultCounterModel.findByRateBaseId(row.key.policyId)
+      } yield {
+        val key = ExactKey(ratePolicy, row.key.itemKey, checkItemType = false)
+        val value = row.value.filter { case (eq, rv) =>
+          // eq filter by rate policy dimension
+          ratePolicy.dimensionSet.exists { dimSet => dimSet == 
eq.dimKeyValues.keys }
+        }.map { case (eq, rv) =>
+          eq -> RateRankingValue(-1, rv.score)
+        }
+        (key, value)
+      }
+    }
+
+    actionPart ++ basePart
+  }
+
+  def trendRankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): 
RDD[(RankingKey, RankingValueMap)] = {
+    rdd.mapPartitions { part =>
+      mapTrendRankingValue(part.toSeq) toIterator
+    }.reduceByKey(reduceValue(RateRankingValue.reduce, RateRankingValue(-1, 
-1))(_, _), numPartitions).mapPartitions { part =>
+      val missingByPolicy = {
+        for {
+          (key, value) <- part.toSeq
+          trendPolicy <- DefaultCounterModel.findById(key.policyId).toSeq
+          actionId <- trendPolicy.rateActionId.toSeq
+          actionPolicy <- DefaultCounterModel.findById(actionId).toSeq
+        } yield {
+          // filter total eq
+          val missingQualifiersWithRRV = value.filterKeys { eq => eq.tq.q != 
IntervalUnit.TOTAL }
+          (actionPolicy, key, missingQualifiersWithRRV)
+        }
+      }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3)))
+
+      val filled = {
+        for {
+          (policy, missing) <- missingByPolicy.toSeq
+          keyWithPast = exactCounter.getPastCounts(policy, missing.map { case 
(k, v) => k.itemKey -> v.keys.toSeq })
+          (key, current) <- missing
+        } yield {
+          val past = keyWithPast.getOrElse(key.itemKey, 
Map.empty[ExactQualifier, Long])
+          val base = past.mapValues(l => RateRankingValue(-1, l))
+//          log.warn(s"trend: $policy $key -> $current $base")
+          key -> reduceValue(RateRankingValue.reduce, RateRankingValue(-1, 
-1))(current, base)
+        }
+      }
+
+//      filled.foreach(println)
+
+      {
+        // filter by rate threshold
+        for {
+          (key, value) <- filled
+          ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
+          (eq, rrv) <- value if rrv.baseScore >= 
ratePolicy.rateThreshold.getOrElse(Int.MinValue)
+        } yield {
+          (RankingKey(key.policyId, key.version, eq), Map(key.itemKey -> 
rrv.rankingValue))
+        }
+      } toIterator
+    }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), 
numPartitions)
+  }
+
+  def rateRankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): 
RDD[(RankingKey, RankingValueMap)] = {
+    rdd.mapPartitions { part =>
+      mapRateRankingValue(part.toSeq) toIterator
+    }.reduceByKey(reduceValue(RateRankingValue.reduce, RateRankingValue(-1, 
-1))(_, _), numPartitions).mapPartitions { part =>
+      val seq = part.toSeq
+//      seq.foreach(x => println(s"item ranking row>> $x"))
+
+      // find and evaluate action value is -1
+      val actionMissingByPolicy = {
+        for {
+          (key, value) <- seq if value.exists { case (eq, rrv) => 
rrv.actionScore == -1 }
+          ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
+          actionId <- ratePolicy.rateActionId.toSeq
+          actionPolicy <- DefaultCounterModel.findById(actionId)
+        } yield {
+          (actionPolicy, key, value.filter { case (eq, rrv) => rrv.actionScore 
== -1 })
+        }
+      }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3)))
+
+      val actionFilled = {
+        for {
+          (actionPolicy, actionMissing) <- actionMissingByPolicy.toSeq
+          keyWithRelated = exactCounter.getRelatedCounts(actionPolicy, 
actionMissing.map { case (k, v) => k.itemKey -> v.keys.toSeq })
+          (key, current) <- actionMissing
+        } yield {
+          val related = keyWithRelated.getOrElse(key.itemKey, 
Map.empty[ExactQualifier, Long])
+          val found = related.mapValues(l => RateRankingValue(l, -1))
+          val filled = reduceValue(RateRankingValue.reduce, 
RateRankingValue(-1, -1))(current, found)
+//          log.warn(s"action: $key -> $found $filled")
+          key -> filled
+        }
+      }
+
+//      actionFilled.foreach(x => println(s"action filled>> $x"))
+
+      // find and evaluate base value is -1
+      val baseMissingByPolicy = {
+        for {
+          (key, value) <- seq if value.exists { case (eq, rrv) => 
rrv.baseScore == -1 }
+          ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
+          baseId <- ratePolicy.rateBaseId.toSeq
+          basePolicy <- DefaultCounterModel.findById(baseId)
+        } yield {
+          (basePolicy, key, value.filter { case (eq, rrv) => rrv.baseScore == 
-1 })
+        }
+      }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3)))
+
+      val baseFilled = {
+        for {
+          (basePolicy, baseMissing) <- baseMissingByPolicy.toSeq
+          keyWithRelated = exactCounter.getRelatedCounts(basePolicy, 
baseMissing.map { case (k, v) => k.itemKey -> v.keys.toSeq })
+          (key, current) <- baseMissing
+        } yield {
+          val related = keyWithRelated.getOrElse(key.itemKey, 
Map.empty[ExactQualifier, Long])
+          val found = related.mapValues(l => RateRankingValue(-1, l))
+          val filled = reduceValue(RateRankingValue.reduce, 
RateRankingValue(-1, -1))(current, found)
+//          log.warn(s"base: $basePolicy $key -> $found $filled")
+          key -> filled
+        }
+      }
+
+//      baseFilled.foreach(x => println(s"base filled>> $x"))
+
+      val alreadyFilled = {
+        for {
+          (key, value) <- seq if value.exists { case (eq, rrv) => 
rrv.actionScore != -1 && rrv.baseScore != -1 }
+        } yield {
+          key -> value.filter { case (eq, rrv) => rrv.actionScore != -1 && 
rrv.baseScore != -1 }
+        }
+      }
+
+      val rtn = {
+        // filter by rate threshold
+        for {
+          (key, value) <- actionFilled ++ baseFilled ++ alreadyFilled
+          ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
+          (eq, rrv) <- value if rrv.baseScore >= 
ratePolicy.rateThreshold.getOrElse(Int.MinValue)
+        } yield {
+          (RankingKey(key.policyId, key.version, eq), Map(key.itemKey -> 
rrv.rankingValue))
+        }
+      }
+      rtn.toIterator
+    }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), 
numPartitions)
+  }
+
+  def insertBlobValue(keys: Seq[BlobExactKey], acc: HashMapAccumulable): Unit 
= {
+    val keyByPolicy = {
+      for {
+        key <- keys
+        policy <- DefaultCounterModel.findById(key.policyId)
+      } yield {
+        (policy, key)
+      }
+    }.groupBy(_._1).mapValues(values => values.map(_._2))
+
+    for {
+      (policy, allKeys) <- keyByPolicy
+      keys <- allKeys.grouped(10)
+      success <- exactCounter.insertBlobValue(policy, keys)
+    } yield {
+      success match {
+        case true => acc += ("BLOB", 1)
+        case false => acc += ("BLOBFailed", 1)
+      }
+    }
+  }
+
+  def updateExactCounter(counts: Seq[(ExactKeyTrait, ExactValueMap)], acc: 
HashMapAccumulable): Seq[TrxLog] = {
+    val countsByPolicy = {
+      for {
+        (key, count) <- counts
+        policy <- DefaultCounterModel.findById(key.policyId)
+      } yield {
+        (policy, (key, count))
+      }
+    }.groupBy { case (policy, _) => policy }.mapValues(values => 
values.map(_._2))
+
+    for {
+      (policy, allCounts) <- countsByPolicy
+      counts <- allCounts.grouped(10)
+      trxLog <- exactCounter.updateCount(policy, counts)
+    } yield {
+      trxLog.success match {
+        case true => acc += (s"ExactV${policy.version}", 1)
+        case false => acc += (s"ExactFailedV${policy.version}", 1)
+      }
+      trxLog
+    }
+  }.toSeq
+
+  def exactCountFromEtl(rdd: RDD[CounterEtlItem], numPartitions: Int): 
RDD[(ExactKeyTrait, ExactValueMap)] = {
+    rdd.mapPartitions { part =>
+      for {
+        item <- part
+        ev <- exactMapper(item).toSeq
+      } yield {
+        ev
+      }
+    }.reduceByKey(reduceValue[ExactQualifier, Long](_ + _, 0L)(_, _), 
numPartitions)
+  }
+
+  def updateRankingCounter(values: TraversableOnce[(RankingKey, 
RankingValueMap)], acc: HashMapAccumulable): Unit = {
+    assert(initialize)
+    val valuesByPolicy = {
+      for {
+        (key, value) <- values.toSeq
+        policy <- DefaultCounterModel.findById(key.policyId)
+        if policy.useRank && rankingCounter.ready(policy) // update only rank 
counter enabled and ready
+      } yield {
+        (policy, (key, value))
+      }
+    }.groupBy { case (policy, _) => policy }.mapValues(values => 
values.map(_._2))
+
+    for {
+      (policy, allValues) <- valuesByPolicy
+      groupedValues <- allValues.grouped(10)
+    } {
+      rankingCounter.update(groupedValues, K_MAX)
+      acc += (s"RankingV${policy.version}", groupedValues.length)
+    }
+  }
+  
+  def produceTrxLog(trxLogs: TraversableOnce[TrxLog]): Unit = {
+    for {
+      trxLog <- trxLogs
+    } {
+      val topic = trxLog.success match {
+        case true => StreamingConfig.KAFKA_TOPIC_COUNTER_TRX
+        case false => StreamingConfig.KAFKA_TOPIC_COUNTER_FAIL
+      }
+      val msg = new KeyedMessage[String, String](topic, 
s"${trxLog.policyId}${trxLog.item}", Json.toJson(trxLog).toString())
+      producer.send(msg)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
new file mode 100644
index 0000000..b1ebe50
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
@@ -0,0 +1,150 @@
+package org.apache.s2graph.counter.loader.core
+
+import org.apache.commons.httpclient.HttpStatus
+import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import org.apache.s2graph.counter.loader.config.StreamingConfig
+import org.apache.s2graph.counter.models.Counter
+import org.apache.s2graph.counter.util.{RetryAsync, CollectionCache, 
CollectionCacheConfig}
+import org.slf4j.LoggerFactory
+import play.api.libs.json._
+import scala.annotation.tailrec
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+import scala.util.Try
+
+object DimensionProps {
+  // using play-ws without play app
+  private val builder = new 
com.ning.http.client.AsyncHttpClientConfig.Builder()
+  private val client = new play.api.libs.ws.ning.NingWSClient(builder.build)
+  private val log = LoggerFactory.getLogger(this.getClass)
+
+  private val retryCnt = 3
+
+  val cacheConfig = 
CollectionCacheConfig(StreamingConfig.PROFILE_CACHE_MAX_SIZE,
+    StreamingConfig.PROFILE_CACHE_TTL_SECONDS,
+    negativeCache = true,
+    3600 // negative ttl 1 hour
+  )
+  val cache: CollectionCache[Option[JsValue]] = new 
CollectionCache[Option[JsValue]](cacheConfig)
+
+  @tailrec
+  private[counter] def makeRequestBody(requestBody: String, keyValues: 
List[(String, String)]): String = {
+    keyValues match {
+      case head :: tail =>
+        makeRequestBody(requestBody.replace(head._1, head._2), tail)
+      case Nil => requestBody
+    }
+  }
+
+  private[counter] def query(bucket: Bucket, item: CounterEtlItem): 
Future[Option[JsValue]] = {
+    val keyValues = (item.dimension.as[JsObject] ++ item.property.as[JsObject] 
fields)
+      .filter { case (key, _) => key.startsWith("[[") && key.endsWith("]]") }
+      .map { case (key, jsValue) =>
+        val replacement = jsValue match {
+          case JsString(s) => s
+          case value => value.toString()
+        }
+        key -> replacement
+      }.toList
+
+    val cacheKey = s"${bucket.impressionId}=" + keyValues.flatMap(x => 
Seq(x._1, x._2)).mkString("_")
+
+    cache.withCacheAsync(cacheKey) {
+      val retryFuture = RetryAsync(retryCnt, withSleep = false) {
+        val future = bucket.httpVerb.toUpperCase match {
+          case "GET" =>
+            client.url(bucket.apiPath).get()
+          case "POST" =>
+            val newBody = makeRequestBody(bucket.requestBody, keyValues)
+            client.url(bucket.apiPath).post(Json.parse(newBody))
+        }
+
+        future.map { resp =>
+          resp.status match {
+            case HttpStatus.SC_OK =>
+              val json = Json.parse(resp.body)
+              for {
+                results <- (json \ "results").asOpt[Seq[JsValue]]
+                result <- results.headOption
+                props <- (result \ "props").asOpt[JsValue]
+              } yield {
+                props
+              }
+            case _ =>
+              log.error(s"${resp.body}(${resp.status}}) item: $item")
+              None
+          }
+        }
+      }
+
+      // if fail to retry
+      retryFuture onFailure { case t => log.error(s"${t.getMessage} item: 
$item") }
+
+      retryFuture
+    }
+  }
+
+  private[counter] def query(service: Service, experiment: Experiment, item: 
CounterEtlItem): Future[Option[JsValue]] = {
+    val keyValues = (item.dimension.as[JsObject] ++ item.property.as[JsObject] 
fields)
+      .filter { case (key, _) => key.startsWith("[[") && key.endsWith("]]") 
}.toMap
+
+    val cacheKey = s"${experiment.name}=" + keyValues.flatMap(x => Seq(x._1, 
x._2)).mkString("_")
+
+    cache.withCacheAsync(cacheKey) {
+      val retryFuture = RetryAsync(retryCnt, withSleep = false) {
+        val url = 
s"${StreamingConfig.GRAPH_URL}/graphs/experiment/${service.accessToken}/${experiment.name}/0"
+        val future = client.url(url).post(Json.toJson(keyValues))
+
+        future.map { resp =>
+          resp.status match {
+            case HttpStatus.SC_OK =>
+              val json = Json.parse(resp.body)
+              for {
+                results <- (json \ "results").asOpt[Seq[JsValue]]
+                result <- results.headOption
+                props <- (result \ "props").asOpt[JsValue]
+              } yield {
+                props
+              }
+            case _ =>
+              log.error(s"${resp.body}(${resp.status}}) item: $item")
+              None
+          }
+        }
+      }
+
+      // if fail to retry
+      retryFuture onFailure { case t => log.error(s"${t.getMessage} item: 
$item") }
+
+      retryFuture
+    }
+  }
+
+  def mergeDimension(policy: Counter, items: List[CounterEtlItem]): 
List[CounterEtlItem] = {
+    for {
+      impId <- policy.bucketImpId
+      bucket <- Bucket.findByImpressionId(impId)
+      experiment <- Experiment.findById(bucket.experimentId)
+      service <- Try { Service.findById(experiment.serviceId) }.toOption
+    } yield {
+      val futures = {
+        for {
+          item <- items
+        } yield {
+          query(service, experiment, item).map {
+            case Some(jsValue) =>
+              val newDimension = item.dimension.as[JsObject] ++ 
jsValue.as[JsObject]
+              item.copy(dimension = newDimension)
+            case None => item
+          }
+        }
+      }
+      Await.result(Future.sequence(futures), 10 seconds)
+    }
+  }.getOrElse(items)
+
+  def getCacheStatsString: String = {
+    cache.getStatsString
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/models/DefaultCounterModel.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/models/DefaultCounterModel.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/models/DefaultCounterModel.scala
new file mode 100644
index 0000000..5908e1c
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/models/DefaultCounterModel.scala
@@ -0,0 +1,6 @@
+package org.apache.s2graph.counter.loader.models
+
+import org.apache.s2graph.counter.models.CounterModel
+import org.apache.s2graph.spark.config.S2ConfigFactory
+
+case object DefaultCounterModel extends CounterModel(S2ConfigFactory.config)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala
new file mode 100644
index 0000000..12f5f73
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala
@@ -0,0 +1,114 @@
+package org.apache.s2graph.counter.loader.stream
+
+import kafka.producer.KeyedMessage
+import kafka.serializer.StringDecoder
+import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.loader.config.StreamingConfig
+import org.apache.s2graph.counter.loader.core.{DimensionProps, CounterEtlItem, 
CounterEtlFunctions}
+import org.apache.s2graph.counter.models.{DBModel, CounterModel}
+import org.apache.s2graph.spark.config.S2ConfigFactory
+import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam}
+import org.apache.spark.streaming.Durations._
+import 
org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions
+import org.apache.spark.streaming.kafka.StreamHelper
+import scala.collection.mutable
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.concurrent.ExecutionContext
+
+object EtlStreaming extends SparkApp with WithKafka {
+  lazy val config = S2ConfigFactory.config
+  lazy val s2Config = new S2CounterConfig(config)
+  lazy val counterModel = new CounterModel(config)
+  lazy val className = getClass.getName.stripSuffix("$")
+  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
+
+  implicit val graphEx = ExecutionContext.Implicits.global
+
+  val initialize = {
+    println("streaming initialize")
+//    Graph(config)
+    DBModel.initialize(config)
+    true
+  }
+
+  val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_ETL)
+  val strInputTopics = inputTopics.mkString(",")
+  val groupId = buildKafkaGroupId(strInputTopics, "etl_to_counter")
+  val kafkaParam = Map(
+    "group.id" -> groupId,
+    "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS,
+    "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER,
+    "zookeeper.connection.timeout.ms" -> "10000"
+  )
+  val streamHelper = StreamHelper(kafkaParam)
+
+  override def run(): Unit = {
+    validateArgument("interval")
+    val (intervalInSec) = seconds(args(0).toLong)
+
+    val conf = sparkConf(s"$strInputTopics: $className")
+    val ssc = streamingContext(conf, intervalInSec)
+    val sc = ssc.sparkContext
+
+    val acc = sc.accumulable(MutableHashMap.empty[String, Long], 
"Throughput")(HashMapParam[String, Long](_ + _))
+
+    /**
+     * read message from etl topic and join user profile from graph and then 
produce whole message to counter topic
+     */
+    val stream = streamHelper.createStream[String, String, StringDecoder, 
StringDecoder](ssc, inputTopics)
+
+    // etl logic
+    stream.foreachRDD { (rdd, ts) =>
+      rdd.foreachPartitionWithOffsetRange { case (osr, part) =>
+        assert(initialize)
+
+        // convert to edge format
+        val items = {
+          for {
+            (k, v) <- part
+            line <- GraphUtil.parseString(v)
+            item <- CounterEtlFunctions.parseEdgeFormat(line)
+          } yield {
+            acc += ("Edges", 1)
+            item
+          }
+        }
+
+        // join user profile
+        val joinItems = items.toList.groupBy { e =>
+          (e.service, e.action)
+        }.flatMap { case ((service, action), v) =>
+          CounterEtlFunctions.checkPolicyAndMergeDimension(service, action, v)
+        }
+
+        // group by kafka partition key and send to kafka
+        val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]]
+        joinItems.foreach { item =>
+          if (item.useProfile) {
+            acc += ("ETL", 1)
+          }
+          val k = getPartKey(item.item, 20)
+          val values: mutable.MutableList[CounterEtlItem] = m.getOrElse(k, 
mutable.MutableList.empty[CounterEtlItem])
+          values += item
+          m.update(k, values)
+        }
+        m.foreach { case (k, v) =>
+          v.map(_.toKafkaMessage).grouped(1000).foreach { grouped =>
+            acc += ("Produce", grouped.size)
+            producer.send(new KeyedMessage[String, 
String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n")))
+          }
+        }
+
+        streamHelper.commitConsumerOffset(osr)
+      }
+
+      if (ts.milliseconds / 1000 % 60 == 0) {
+        log.warn(DimensionProps.getCacheStatsString)
+      }
+    }
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreaming.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreaming.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreaming.scala
new file mode 100644
index 0000000..3eea406
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreaming.scala
@@ -0,0 +1,69 @@
+package org.apache.s2graph.counter.loader.stream
+
+import kafka.serializer.StringDecoder
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.loader.config.StreamingConfig
+import org.apache.s2graph.counter.loader.core.CounterFunctions
+import org.apache.s2graph.spark.config.S2ConfigFactory
+import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam}
+import org.apache.spark.streaming.Durations._
+import 
org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions
+import org.apache.spark.streaming.kafka.{HasOffsetRanges, StreamHelper}
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.language.postfixOps
+
+object ExactCounterStreaming extends SparkApp with WithKafka {
+  lazy val config = S2ConfigFactory.config
+  lazy val s2Config = new S2CounterConfig(config)
+  lazy val className = getClass.getName.stripSuffix("$")
+
+  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
+
+  val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_COUNTER)
+  val strInputTopics = inputTopics.mkString(",")
+  val groupId = buildKafkaGroupId(strInputTopics, "counter_v2")
+  val kafkaParam = Map(
+//    "auto.offset.reset" -> "smallest",
+    "group.id" -> groupId,
+    "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS,
+    "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER,
+    "zookeeper.connection.timeout.ms" -> "10000"
+  )
+  val streamHelper = StreamHelper(kafkaParam)
+
+  override def run() = {
+    validateArgument("interval", "clear")
+    val (intervalInSec, clear) = (seconds(args(0).toLong), args(1).toBoolean)
+
+    if (clear) {
+      streamHelper.kafkaHelper.consumerGroupCleanup()
+    }
+
+    val conf = sparkConf(s"$strInputTopics: $className")
+    val ssc = streamingContext(conf, intervalInSec)
+    val sc = ssc.sparkContext
+
+    implicit val acc: HashMapAccumulable = 
sc.accumulable(MutableHashMap.empty[String, Long], 
"Throughput")(HashMapParam[String, Long](_ + _))
+
+    // make stream
+    val stream = streamHelper.createStream[String, String, StringDecoder, 
StringDecoder](ssc, inputTopics)
+    stream.foreachRDD { (rdd, ts) =>
+      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+
+      val exactRDD = CounterFunctions.makeExactRdd(rdd, offsets.length)
+
+      // for at-least once semantic
+      exactRDD.foreachPartitionWithIndex { (i, part) =>
+        // update exact counter
+        val trxLogs = CounterFunctions.updateExactCounter(part.toSeq, acc)
+        CounterFunctions.produceTrxLog(trxLogs)
+
+        // commit offset range
+        streamHelper.commitConsumerOffset(offsets(i))
+      }
+    }
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala
new file mode 100644
index 0000000..9e6d6be
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala
@@ -0,0 +1,80 @@
+package org.apache.s2graph.counter.loader.stream
+
+import kafka.producer.KeyedMessage
+import kafka.serializer.StringDecoder
+import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.loader.config.StreamingConfig
+import org.apache.s2graph.spark.config.S2ConfigFactory
+import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam}
+import org.apache.spark.streaming.Durations._
+import 
org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions
+import scala.collection.mutable
+import scala.collection.mutable.{HashMap => MutableHashMap}
+
+object GraphToETLStreaming extends SparkApp with WithKafka {
+  lazy val config = S2ConfigFactory.config
+  lazy val s2Config = new S2CounterConfig(config)
+  lazy val className = getClass.getName.stripSuffix("$")
+  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
+
+  override def run(): Unit = {
+    validateArgument("interval", "topic")
+    val (intervalInSec, topic) = (seconds(args(0).toLong), args(1))
+
+    val groupId = buildKafkaGroupId(topic, "graph_to_etl")
+    val kafkaParam = Map(
+//      "auto.offset.reset" -> "smallest",
+      "group.id" -> groupId,
+      "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS,
+      "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER,
+      "zookeeper.connection.timeout.ms" -> "10000"
+    )
+
+    val conf = sparkConf(s"$topic: $className")
+    val ssc = streamingContext(conf, intervalInSec)
+    val sc = ssc.sparkContext
+
+    val acc = sc.accumulable(MutableHashMap.empty[String, Long], 
"Throughput")(HashMapParam[String, Long](_ + _))
+
+    /**
+     * consume graphIn topic and produce messages to etl topic
+     * two purpose
+     * 1. partition by target vertex id
+     * 2. expand kafka partition count
+     */
+    val stream = getStreamHelper(kafkaParam).createStream[String, String, 
StringDecoder, StringDecoder](ssc, topic.split(',').toSet)
+    stream.foreachRDD { rdd =>
+      rdd.foreachPartitionWithOffsetRange { case (osr, part) =>
+        val m = MutableHashMap.empty[Int, mutable.MutableList[String]]
+        for {
+          (k, v) <- part
+          line <- GraphUtil.parseString(v)
+        } {
+          try {
+            val sp = GraphUtil.split(line)
+            // get partition key by target vertex id
+            val partKey = getPartKey(sp(4), 20)
+            val values = m.getOrElse(partKey, 
mutable.MutableList.empty[String])
+            values += line
+            m.update(partKey, values)
+          } catch {
+            case ex: Throwable =>
+              log.error(s"$ex: $line")
+          }
+        }
+
+        m.foreach { case (k, v) =>
+          v.grouped(1000).foreach { grouped =>
+            producer.send(new KeyedMessage[String, 
String](StreamingConfig.KAFKA_TOPIC_ETL, null, k, grouped.mkString("\n")))
+          }
+        }
+
+        getStreamHelper(kafkaParam).commitConsumerOffset(osr)
+      }
+    }
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreaming.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreaming.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreaming.scala
new file mode 100644
index 0000000..2c2335b
--- /dev/null
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreaming.scala
@@ -0,0 +1,72 @@
+package org.apache.s2graph.counter.loader.stream
+
+import kafka.serializer.StringDecoder
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.loader.config.StreamingConfig
+import org.apache.s2graph.counter.loader.core.CounterFunctions
+import org.apache.s2graph.spark.config.S2ConfigFactory
+import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam}
+import org.apache.spark.streaming.Durations._
+import 
org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions
+import org.apache.spark.streaming.kafka.{HasOffsetRanges, StreamHelper}
+import scala.collection.mutable.{HashMap => MutableHashMap}
+
+object RankingCounterStreaming extends SparkApp with WithKafka {
+  lazy val config = S2ConfigFactory.config
+  lazy val s2Config = new S2CounterConfig(config)
+  lazy val className = getClass.getName.stripSuffix("$")
+
+  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
+
+  val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_COUNTER_TRX)
+  val strInputTopics = inputTopics.mkString(",")
+  val groupId = buildKafkaGroupId(strInputTopics, "ranking_v2")
+  val kafkaParam = Map(
+//    "auto.offset.reset" -> "smallest",
+    "group.id" -> groupId,
+    "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS,
+    "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER,
+    "zookeeper.connection.timeout.ms" -> "10000"
+  )
+  val streamHelper = StreamHelper(kafkaParam)
+
+  override def run() = {
+    validateArgument("interval", "clear")
+    val (intervalInSec, clear) = (seconds(args(0).toLong), args(1).toBoolean)
+
+    if (clear) {
+      streamHelper.kafkaHelper.consumerGroupCleanup()
+    }
+
+    val conf = sparkConf(s"$strInputTopics: $className")
+    val ssc = streamingContext(conf, intervalInSec)
+    val sc = ssc.sparkContext
+
+    implicit val acc: HashMapAccumulable = 
sc.accumulable(MutableHashMap.empty[String, Long], 
"Throughput")(HashMapParam[String, Long](_ + _))
+
+    // make stream
+    val stream = streamHelper.createStream[String, String, StringDecoder, 
StringDecoder](ssc, inputTopics)
+    stream.foreachRDD { (rdd, ts) =>
+      // for at-least once semantic
+      val nextRdd = {
+        CounterFunctions.makeRankingRdd(rdd, 
sc.defaultParallelism).foreachPartition { part =>
+          // update ranking counter
+          CounterFunctions.updateRankingCounter(part, acc)
+        }
+        rdd
+      }
+
+      streamHelper.commitConsumerOffsets(nextRdd.asInstanceOf[HasOffsetRanges])
+//      CounterFunctions.makeRankingRdd(rdd, 
offsets.length).foreachPartitionWithIndex { (i, part) =>
+//        // update ranking counter
+//        CounterFunctions.updateRankingCounter(part, acc)
+//
+//        // commit offset range
+//        streamHelper.commitConsumerOffset(offsets(i))
+//      }
+    }
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/s2/config/StreamingConfig.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/s2/config/StreamingConfig.scala 
b/s2counter_loader/src/main/scala/s2/config/StreamingConfig.scala
deleted file mode 100644
index ba5e863..0000000
--- a/s2counter_loader/src/main/scala/s2/config/StreamingConfig.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package s2.config
-
-/**
- * Created by hsleep([email protected]) on 15. 4. 7..
- */
-object StreamingConfig extends ConfigFunctions(S2ConfigFactory.config) {
-  // kafka
-  val KAFKA_ZOOKEEPER = getOrElse("kafka.zookeeper", "localhost")
-  val KAFKA_BROKERS = getOrElse("kafka.brokers", "localhost")
-  val KAFKA_TOPIC_GRAPH = getOrElse("kafka.topic.graph", "s2graphInalpha")
-  val KAFKA_TOPIC_ETL = getOrElse("kafka.topic.etl", "s2counter-etl-alpha")
-  val KAFKA_TOPIC_COUNTER = getOrElse("kafka.topic.counter", "s2counter-alpha")
-  val KAFKA_TOPIC_COUNTER_TRX = getOrElse("kafka.topic.counter-trx", 
"s2counter-trx-alpha")
-  val KAFKA_TOPIC_COUNTER_FAIL = getOrElse("kafka.topic.counter-fail", 
"s2counter-fail-alpha")
-
-  // profile cache
-  val PROFILE_CACHE_TTL_SECONDS = getOrElse("profile.cache.ttl.seconds", 60 * 
60 * 24)    // default 1 day
-  val PROFILE_CACHE_MAX_SIZE = getOrElse("profile.cache.max.size", 10000)
-  val PROFILE_PREFETCH_SIZE = getOrElse("profile.prefetch.size", 10)
-
-  // graph url
-  val GRAPH_URL = getOrElse("s2graph.url", "")
-  val GRAPH_READONLY_URL = getOrElse("s2graph.read-only.url", GRAPH_URL)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/main/scala/s2/counter/CounterBulkLoader.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/s2/counter/CounterBulkLoader.scala 
b/s2counter_loader/src/main/scala/s2/counter/CounterBulkLoader.scala
deleted file mode 100644
index 2843022..0000000
--- a/s2counter_loader/src/main/scala/s2/counter/CounterBulkLoader.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-package s2.counter
-
-import com.kakao.s2graph.core.{Graph, GraphUtil}
-import org.apache.spark.SparkContext
-import s2.config.{S2ConfigFactory, S2CounterConfig, StreamingConfig}
-import s2.counter.core.{BlobExactKey, CounterEtlFunctions, CounterFunctions}
-import s2.models.Counter.ItemType
-import s2.models.{CounterModel, DBModel}
-import s2.spark.{HashMapParam, SparkApp, WithKafka}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.concurrent.ExecutionContext
-
-/**
- * Created by rain on 7/1/15.
- */
-object CounterBulkLoader extends SparkApp with WithKafka {
-  lazy val config = S2ConfigFactory.config
-  lazy val s2Config = new S2CounterConfig(config)
-  lazy val counterModel = new CounterModel(config)
-  lazy val className = getClass.getName.stripSuffix("$")
-  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
-
-  implicit val graphEx = ExecutionContext.Implicits.global
-
-  val initialize = {
-    println("initialize")
-//    Graph(config)
-    DBModel.initialize(config)
-    true
-  }
-
-  override def run(): Unit = {
-    val hdfsPath = args(0)
-    val blockSize = args(1).toInt
-    val minPartitions = args(2).toInt
-    val conf = sparkConf(s"$hdfsPath: CounterBulkLoader")
-
-    val sc = new SparkContext(conf)
-    val acc = sc.accumulable(MutableHashMap.empty[String, Long], 
"Throughput")(HashMapParam[String, Long](_ + _))
-
-    val msgs = sc.textFile(hdfsPath)
-
-    val etlRdd = msgs.repartition(minPartitions).mapPartitions { part =>
-      // parse and etl
-      assert(initialize)
-      val items = {
-        for {
-          msg <- part
-          line <- GraphUtil.parseString(msg)
-          sp = GraphUtil.split(line) if sp.size <= 7 || 
GraphUtil.split(line)(7) != "in"
-          item <- CounterEtlFunctions.parseEdgeFormat(line)
-        } yield {
-          acc +=("Edges", 1)
-          item
-        }
-      }
-      items.grouped(blockSize).flatMap { grouped =>
-        grouped.groupBy(e => (e.service, e.action)).flatMap { case ((service, 
action), v) =>
-          CounterEtlFunctions.checkPolicyAndMergeDimension(service, action, 
v.toList)
-        }
-      }
-    }
-
-    val exactRdd = CounterFunctions.exactCountFromEtl(etlRdd, 
etlRdd.partitions.length)
-    val logRdd = exactRdd.mapPartitions { part =>
-      val seq = part.toSeq
-      CounterFunctions.insertBlobValue(seq.map(_._1).filter(_.itemType == 
ItemType.BLOB).map(_.asInstanceOf[BlobExactKey]), acc)
-      // update exact counter
-      CounterFunctions.updateExactCounter(seq, acc).toIterator
-    }
-
-    val rankRdd = CounterFunctions.makeRankingRddFromTrxLog(logRdd, 
logRdd.partitions.length)
-    rankRdd.foreachPartition { part =>
-      CounterFunctions.updateRankingCounter(part, acc)
-    }
-  }
-}
\ No newline at end of file

Reply via email to