http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/EraseDailyCounter.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/s2/counter/EraseDailyCounter.scala 
b/s2counter_loader/src/main/scala/s2/counter/EraseDailyCounter.scala
deleted file mode 100644
index 71784fa..0000000
--- a/s2counter_loader/src/main/scala/s2/counter/EraseDailyCounter.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-package s2.counter
-
-import java.text.SimpleDateFormat
-
-import kafka.producer.KeyedMessage
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import play.api.libs.json.Json
-import s2.config.{S2ConfigFactory, StreamingConfig}
-import s2.counter.core.ExactCounter.ExactValueMap
-import s2.counter.core._
-import s2.counter.core.v1.ExactStorageHBase
-import s2.counter.core.v2.ExactStorageGraph
-import s2.models.{Counter, CounterModel, DBModel}
-import s2.spark.{SparkApp, WithKafka}
-
-import scala.collection.mutable
-import scala.collection.mutable.{HashMap => MutableHashMap}
-
-/**
- * Created by hsleep([email protected]) on 15. 7. 1..
- */
-object EraseDailyCounter extends SparkApp with WithKafka {
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
-
-  def valueToEtlItem(policy: Counter, key: ExactKeyTrait, values: 
ExactValueMap): Seq[CounterEtlItem] = {
-    if (values.nonEmpty) {
-      for {
-        (eq, value) <- filter(values.toList)
-      } yield {
-        CounterEtlItem(eq.tq.ts, policy.service, policy.action, key.itemKey, 
Json.toJson(eq.dimKeyValues), Json.toJson(Map("value" -> -value)))
-      }
-    } else {
-      Nil
-    }
-  }
-
-  def filter(values: List[(ExactQualifier, Long)]): List[(ExactQualifier, 
Long)] = {
-    val sorted = values.sortBy(_._1.dimKeyValues.size).reverse
-    val (eq, value) = sorted.head
-    val dimKeys = eq.dimKeyValues.toSeq
-    val flat = {
-      for {
-        i <- 0 to dimKeys.length
-        comb <- dimKeys.combinations(i)
-      } yield {
-        ExactQualifier(eq.tq, comb.toMap) -> value
-      }
-    }.toMap
-
-//    println("flat >>>", flat)
-
-    val valuesMap = values.toMap
-    val remain = (valuesMap ++ flat.map { case (k, v) =>
-      k -> (valuesMap(k) - v)
-    }).filter(_._2 > 0).toList
-
-//    println("remain >>>", remain)
-
-    if (remain.isEmpty) {
-      List((eq, value))
-    } else {
-      (eq, value) :: filter(remain)
-    }
-  }
-
-  def produce(policy: Counter, exactRdd: RDD[(ExactKeyTrait, ExactValueMap)]): 
Unit = {
-    exactRdd.mapPartitions { part =>
-      for {
-        (key, values) <- part
-        item <- valueToEtlItem(policy, key, values)
-      } yield {
-        item
-      }
-    }.foreachPartition { part =>
-      val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]]
-      part.foreach { item =>
-        val k = getPartKey(item.item, 20)
-        val values = m.getOrElse(k, mutable.MutableList.empty[CounterEtlItem])
-        values += item
-        m.update(k, values)
-      }
-      m.foreach { case (k, v) =>
-        v.map(_.toKafkaMessage).grouped(1000).foreach { grouped =>
-//          println(grouped)
-          producer.send(new KeyedMessage[String, 
String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n")))
-        }
-      }
-    }
-  }
-
-  def rddToExactRdd(policy: Counter, date: String, rdd: RDD[String]): 
RDD[(ExactKeyTrait, ExactValueMap)] = {
-    val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
-    val fromTs = dateFormat.parse(date).getTime
-    val toTs = fromTs + 23 * 60 * 60 * 1000
-
-    rdd.mapPartitions { part =>
-      val exactCounter = policy.version match {
-        case VERSION_1 => new ExactCounter(S2ConfigFactory.config, new 
ExactStorageHBase(S2ConfigFactory.config))
-        case VERSION_2 => new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
-      }
-
-      for {
-        line <- part
-        FetchedCounts(exactKey, qualifierWithCountMap) <- 
exactCounter.getCount(policy, line, Array(TimedQualifier.IntervalUnit.DAILY), 
fromTs, toTs)
-      } yield {
-        (exactKey, qualifierWithCountMap)
-      }
-    }
-  }
-
-  lazy val className = getClass.getName.stripSuffix("$")
-
-  override def run(): Unit = {
-    validateArgument("service", "action", "date", "file", "op")
-    DBModel.initialize(S2ConfigFactory.config)
-
-    val (service, action, date, file, op) = (args(0), args(1), args(2), 
args(3), args(4))
-    val conf = sparkConf(s"$className: $service.$action")
-
-    val ctx = new SparkContext(conf)
-
-    val rdd = ctx.textFile(file, 20)
-
-    val counterModel = new CounterModel(S2ConfigFactory.config)
-
-    val policy = counterModel.findByServiceAction(service, action).get
-    val exactRdd = rddToExactRdd(policy, date, rdd)
-    produce(policy, exactRdd)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlFunctions.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlFunctions.scala 
b/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlFunctions.scala
deleted file mode 100644
index cefce65..0000000
--- a/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlFunctions.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-package s2.counter.core
-
-import com.kakao.s2graph.core.{Edge, Graph, GraphUtil}
-import org.apache.spark.Logging
-import play.api.libs.json._
-import s2.config.{S2ConfigFactory, StreamingConfig}
-import s2.models.CounterModel
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-
-/**
- * Created by hsleep([email protected]) on 15. 3. 17..
- */
-object CounterEtlFunctions extends Logging {
-  lazy val filterOps = Seq("insert", "insertBulk", "update", 
"increment").map(op => GraphUtil.operations(op))
-  lazy val preFetchSize = StreamingConfig.PROFILE_PREFETCH_SIZE
-  lazy val config = S2ConfigFactory.config
-  lazy val counterModel = new CounterModel(config)
-
-  def logToEdge(line: String): Option[Edge] = {
-    for {
-      elem <- Graph.toGraphElement(line) if elem.isInstanceOf[Edge]
-      edge <- Some(elem.asInstanceOf[Edge]).filter { x =>
-        filterOps.contains(x.op)
-      }
-    } yield {
-      edge
-    }
-  }
-
-  def parseEdgeFormat(line: String): Option[CounterEtlItem] = {
-    /**
-     * 1427082276804   insert  edge    19073318        
52453027_93524145648511699      story_user_ch_doc_view  {"doc_type" : "l", 
"channel_subscribing" : "y", "view_from" : "feed"}
-     */
-    for {
-      elem <- Graph.toGraphElement(line) if elem.isInstanceOf[Edge]
-      edge <- Some(elem.asInstanceOf[Edge]).filter { x =>
-        filterOps.contains(x.op)
-      }
-    } yield {
-      val label = edge.label
-      val labelName = label.label
-      val tgtService = label.tgtColumn.service.serviceName
-      val tgtId = edge.tgtVertex.innerId.toString()
-      val srcId = edge.srcVertex.innerId.toString()
-
-      // make empty property if no exist edge property
-      val dimension = Json.parse(Some(GraphUtil.split(line)).filter(_.length 
>= 7).map(_(6)).getOrElse("{}"))
-      val bucketKeys = Seq("_from")
-      val bucketKeyValues = {
-        for {
-          variable <- bucketKeys
-        } yield {
-          val jsValue = variable match {
-            case "_from" => JsString(srcId)
-            case s => dimension \ s
-          }
-          s"[[$variable]]" -> jsValue
-        }
-      }
-      val property = Json.toJson(bucketKeyValues :+ ("value" -> JsString("1")) 
toMap)
-//      val property = Json.toJson(Map("_from" -> srcId, "_to" -> tgtId, 
"value" -> "1"))
-
-      CounterEtlItem(edge.ts, tgtService, labelName, tgtId, dimension, 
property)
-    }
-  }
-
-  def parseEdgeFormat(lines: List[String]): List[CounterEtlItem] = {
-    for {
-      line <- lines
-      item <- parseEdgeFormat(line)
-    } yield {
-      item
-    }
-  }
-  
-  def checkPolicyAndMergeDimension(service: String, action: String, items: 
List[CounterEtlItem]): List[CounterEtlItem] = {
-    counterModel.findByServiceAction(service, action).map { policy =>
-      if (policy.useProfile) {
-        policy.bucketImpId match {
-          case Some(_) => DimensionProps.mergeDimension(policy, items)
-          case None => Nil
-        }
-      } else {
-        items
-      }
-    }.getOrElse(Nil)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlItem.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlItem.scala 
b/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlItem.scala
deleted file mode 100644
index 1b0f3cd..0000000
--- a/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlItem.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package s2.counter.core
-
-import org.slf4j.LoggerFactory
-import play.api.libs.json._
-import s2.util.UnitConverter
-
-import scala.util.{Failure, Success, Try}
-
-/**
-*  Created by hsleep([email protected]) on 15. 10. 6..
-*/
-case class CounterEtlItem(ts: Long, service: String, action: String, item: 
String, dimension: JsValue, property: JsValue, useProfile: Boolean = false) {
-  def toKafkaMessage: String = {
-    
s"$ts\t$service\t$action\t$item\t${dimension.toString()}\t${property.toString()}"
-  }
-
-  lazy val value = {
-    property \ "value" match {
-      case JsNumber(n) => n.longValue()
-      case JsString(s) => s.toLong
-      case _: JsUndefined => 1L
-      case _ => throw new Exception("wrong type")
-    }
-  }
-}
-
-object CounterEtlItem {
-  val log = LoggerFactory.getLogger(this.getClass)
-
-  def apply(line: String): Option[CounterEtlItem] = {
-    Try {
-      val Array(ts, service, action, item, dimension, property) = 
line.split('\t')
-      CounterEtlItem(UnitConverter.toMillis(ts.toLong), service, action, item, 
Json.parse(dimension), Json.parse(property))
-    } match {
-      case Success(item) =>
-        Some(item)
-      case Failure(ex) =>
-        log.error(">>> failed")
-        log.error(s"${ex.toString}: $line")
-        None
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala 
b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
deleted file mode 100644
index a36b55f..0000000
--- a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala
+++ /dev/null
@@ -1,446 +0,0 @@
-package s2.counter.core
-
-import com.kakao.s2graph.core.GraphUtil
-import kafka.producer.KeyedMessage
-import org.apache.spark.rdd.RDD
-import org.apache.spark.{Accumulable, Logging}
-import play.api.libs.json.{JsString, JsNumber, JsValue, Json}
-import s2.config.{S2ConfigFactory, StreamingConfig}
-import s2.counter.TrxLog
-import s2.counter.core.ExactCounter.ExactValueMap
-import s2.counter.core.RankingCounter.RankingValueMap
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core.v2.{ExactStorageGraph, RankingStorageGraph}
-import s2.models.{Counter, DBModel, DefaultCounterModel}
-import s2.spark.WithKafka
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.language.postfixOps
-import scala.util.Try
-
-/**
- * Created by hsleep([email protected]) on 15. 10. 6..
- */
-object CounterFunctions extends Logging with WithKafka {
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  private val K_MAX = 500
-
-  val exactCounter = new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
-  val rankingCounter = new RankingCounter(S2ConfigFactory.config, new 
RankingStorageGraph(S2ConfigFactory.config))
-
-  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
-
-  type HashMapAccumulable = Accumulable[MutableHashMap[String, Long], (String, 
Long)]
-
-  val initialize = {
-    logInfo("initialize CounterFunctions")
-    DBModel.initialize(S2ConfigFactory.config)
-    true
-  }
-
-  def getCountValue(policy: Counter, item: CounterEtlItem): ExactValueMap = {
-    for {
-      dimKeys <- policy.dimensionList
-      dimValues <- getDimensionValues(item.dimension, dimKeys).toSeq
-      eq <- 
ExactQualifier.getQualifiers(policy.intervals.map(IntervalUnit.withName), 
item.ts, dimKeys.zip(dimValues).toMap)
-    } yield {
-      eq -> item.value
-    }
-  }.toMap
-
-  def getDimensionValues(dimension: JsValue, keys: Array[String]): 
Option[Array[String]] = {
-    Try {
-      for {
-        k <- keys
-        jsValue = dimension \ k
-      } yield {
-        jsValue match {
-          case JsNumber(n) => n.toString()
-          case JsString(s) => s
-          case _ => throw new Exception()
-        }
-      }
-    }.toOption
-  }
-  
-  def exactMapper(item: CounterEtlItem): Option[(ExactKeyTrait, 
ExactValueMap)] = {
-    DefaultCounterModel.findByServiceAction(item.service, item.action).map { 
policy =>
-      (ExactKey(policy, item.item, checkItemType = true), 
getCountValue(policy, item))
-    }
-  }
-
-  def rankingMapper(row: ItemRankingRow): Seq[(RankingKey, RankingValueMap)] = 
{
-    // normal ranking
-    for {
-      (eq, rv) <- row.value
-    } yield {
-      (RankingKey(row.key.policyId, row.key.version, eq), Map(row.key.itemKey 
-> rv))
-    }
-  }.toSeq
-
-  def logToRankValue(log: TrxLog): Option[(ExactKeyTrait, Map[ExactQualifier, 
RankingValue])] = {
-    DefaultCounterModel.findById(log.policyId).map { policy =>
-      val key = ExactKey(policy, log.item, checkItemType = false)
-      val value = {
-        for {
-          result <- log.results
-        } yield {
-          ExactQualifier(TimedQualifier(result.interval, result.ts), 
result.dimension) -> RankingValue(result.result, result.value)
-        }
-      }.toMap
-      key -> value
-    }
-  }
-
-  def reduceValue[T, U](op: (U, U) => U, default: U)(m1: Map[T, U], m2: Map[T, 
U]): Map[T, U] = {
-    m1 ++ m2.map { case (k, v) =>
-      k -> op(m1.getOrElse(k, default), v)
-    }
-  }
-
-  def makeExactRdd(rdd: RDD[(String, String)], numPartitions: Int): 
RDD[(ExactKeyTrait, ExactValueMap)] = {
-    rdd.mapPartitions { part =>
-      assert(initialize)
-      for {
-        (k, v) <- part
-        line <- GraphUtil.parseString(v)
-        item <- CounterEtlItem(line).toSeq
-        ev <- exactMapper(item).toSeq
-      } yield {
-        ev
-      }
-    }.reduceByKey(reduceValue[ExactQualifier, Long](_ + _, 0L)(_, _), 
numPartitions)
-  }
-
-  def makeRankingRdd(rdd: RDD[(String, String)], numPartitions: Int): 
RDD[(RankingKey, RankingValueMap)] = {
-    val logRdd = makeTrxLogRdd(rdd, numPartitions)
-    makeRankingRddFromTrxLog(logRdd, numPartitions)
-  }
-
-  def makeRankingRddFromTrxLog(rdd: RDD[TrxLog], numPartitions: Int): 
RDD[(RankingKey, RankingValueMap)] = {
-    val itemRankingRdd = makeItemRankingRdd(rdd, numPartitions).cache()
-    try {
-      rankingCount(itemRankingRdd, numPartitions) union
-        rateRankingCount(itemRankingRdd, numPartitions) union
-        trendRankingCount(itemRankingRdd, numPartitions) coalesce numPartitions
-    } finally {
-      itemRankingRdd.unpersist(false)
-    }
-  }
-
-  def makeTrxLogRdd(rdd: RDD[(String, String)], numPartitions: Int): 
RDD[TrxLog] = {
-    rdd.mapPartitions { part =>
-      assert(initialize)
-      for {
-        (k, v) <- part
-        line <- GraphUtil.parseString(v)
-        trxLog = Json.parse(line).as[TrxLog] if trxLog.success
-      } yield {
-        trxLog
-      }
-    }
-  }
-
-  def rankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): 
RDD[(RankingKey, RankingValueMap)] = {
-    rdd.mapPartitions { part =>
-      for {
-        row <- part
-        rv <- rankingMapper(row)
-      } yield {
-        rv
-      }
-    }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), 
numPartitions)
-  }
-
-  case class ItemRankingRow(key: ExactKeyTrait, value: Map[ExactQualifier, 
RankingValue])
-
-  def makeItemRankingRdd(rdd: RDD[TrxLog], numPartitions: Int): 
RDD[ItemRankingRow] = {
-    rdd.mapPartitions { part =>
-      for {
-        log <- part
-        rv <- logToRankValue(log)
-      } yield {
-        rv
-      }
-    }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), 
numPartitions).mapPartitions { part =>
-      for {
-        (key, value) <- part
-      } yield {
-        ItemRankingRow(key, value)
-      }
-    }
-  }
-
-  def mapTrendRankingValue(rows: Seq[ItemRankingRow]): Seq[(ExactKeyTrait, 
Map[ExactQualifier, RateRankingValue])] = {
-    for {
-      row <- rows
-      trendPolicy <- DefaultCounterModel.findByTrendActionId(row.key.policyId)
-    } yield {
-      val key = ExactKey(trendPolicy, row.key.itemKey, checkItemType = false)
-      val value = row.value.filter { case (eq, rv) =>
-        // eq filter by rate policy dimension
-        trendPolicy.dimensionSet.exists { dimSet => dimSet == 
eq.dimKeyValues.keys }
-      }.map { case (eq, rv) =>
-        eq -> RateRankingValue(rv.score, -1)
-      }
-      (key, value)
-    }
-  }
-
-  def mapRateRankingValue(rows: Seq[ItemRankingRow]): Seq[(ExactKeyTrait, 
Map[ExactQualifier, RateRankingValue])] = {
-    val actionPart = {
-      for {
-        row <- rows
-        ratePolicy <- DefaultCounterModel.findByRateActionId(row.key.policyId)
-      } yield {
-        val key = ExactKey(ratePolicy, row.key.itemKey, checkItemType = false)
-        val value = row.value.filter { case (eq, rv) =>
-          // eq filter by rate policy dimension
-          ratePolicy.dimensionSet.exists { dimSet => dimSet == 
eq.dimKeyValues.keys }
-        }.map { case (eq, rv) =>
-          eq -> RateRankingValue(rv.score, -1)
-        }
-        (key, value)
-      }
-    }
-
-    val basePart = {
-      for {
-        row <- rows
-        ratePolicy <- DefaultCounterModel.findByRateBaseId(row.key.policyId)
-      } yield {
-        val key = ExactKey(ratePolicy, row.key.itemKey, checkItemType = false)
-        val value = row.value.filter { case (eq, rv) =>
-          // eq filter by rate policy dimension
-          ratePolicy.dimensionSet.exists { dimSet => dimSet == 
eq.dimKeyValues.keys }
-        }.map { case (eq, rv) =>
-          eq -> RateRankingValue(-1, rv.score)
-        }
-        (key, value)
-      }
-    }
-
-    actionPart ++ basePart
-  }
-
-  def trendRankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): 
RDD[(RankingKey, RankingValueMap)] = {
-    rdd.mapPartitions { part =>
-      mapTrendRankingValue(part.toSeq) toIterator
-    }.reduceByKey(reduceValue(RateRankingValue.reduce, RateRankingValue(-1, 
-1))(_, _), numPartitions).mapPartitions { part =>
-      val missingByPolicy = {
-        for {
-          (key, value) <- part.toSeq
-          trendPolicy <- DefaultCounterModel.findById(key.policyId).toSeq
-          actionId <- trendPolicy.rateActionId.toSeq
-          actionPolicy <- DefaultCounterModel.findById(actionId).toSeq
-        } yield {
-          // filter total eq
-          val missingQualifiersWithRRV = value.filterKeys { eq => eq.tq.q != 
IntervalUnit.TOTAL }
-          (actionPolicy, key, missingQualifiersWithRRV)
-        }
-      }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3)))
-
-      val filled = {
-        for {
-          (policy, missing) <- missingByPolicy.toSeq
-          keyWithPast = exactCounter.getPastCounts(policy, missing.map { case 
(k, v) => k.itemKey -> v.keys.toSeq })
-          (key, current) <- missing
-        } yield {
-          val past = keyWithPast.getOrElse(key.itemKey, 
Map.empty[ExactQualifier, Long])
-          val base = past.mapValues(l => RateRankingValue(-1, l))
-//          log.warn(s"trend: $policy $key -> $current $base")
-          key -> reduceValue(RateRankingValue.reduce, RateRankingValue(-1, 
-1))(current, base)
-        }
-      }
-
-//      filled.foreach(println)
-
-      {
-        // filter by rate threshold
-        for {
-          (key, value) <- filled
-          ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
-          (eq, rrv) <- value if rrv.baseScore >= 
ratePolicy.rateThreshold.getOrElse(Int.MinValue)
-        } yield {
-          (RankingKey(key.policyId, key.version, eq), Map(key.itemKey -> 
rrv.rankingValue))
-        }
-      } toIterator
-    }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), 
numPartitions)
-  }
-
-  def rateRankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): 
RDD[(RankingKey, RankingValueMap)] = {
-    rdd.mapPartitions { part =>
-      mapRateRankingValue(part.toSeq) toIterator
-    }.reduceByKey(reduceValue(RateRankingValue.reduce, RateRankingValue(-1, 
-1))(_, _), numPartitions).mapPartitions { part =>
-      val seq = part.toSeq
-//      seq.foreach(x => println(s"item ranking row>> $x"))
-
-      // find and evaluate action value is -1
-      val actionMissingByPolicy = {
-        for {
-          (key, value) <- seq if value.exists { case (eq, rrv) => 
rrv.actionScore == -1 }
-          ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
-          actionId <- ratePolicy.rateActionId.toSeq
-          actionPolicy <- DefaultCounterModel.findById(actionId)
-        } yield {
-          (actionPolicy, key, value.filter { case (eq, rrv) => rrv.actionScore 
== -1 })
-        }
-      }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3)))
-
-      val actionFilled = {
-        for {
-          (actionPolicy, actionMissing) <- actionMissingByPolicy.toSeq
-          keyWithRelated = exactCounter.getRelatedCounts(actionPolicy, 
actionMissing.map { case (k, v) => k.itemKey -> v.keys.toSeq })
-          (key, current) <- actionMissing
-        } yield {
-          val related = keyWithRelated.getOrElse(key.itemKey, 
Map.empty[ExactQualifier, Long])
-          val found = related.mapValues(l => RateRankingValue(l, -1))
-          val filled = reduceValue(RateRankingValue.reduce, 
RateRankingValue(-1, -1))(current, found)
-//          log.warn(s"action: $key -> $found $filled")
-          key -> filled
-        }
-      }
-
-//      actionFilled.foreach(x => println(s"action filled>> $x"))
-
-      // find and evaluate base value is -1
-      val baseMissingByPolicy = {
-        for {
-          (key, value) <- seq if value.exists { case (eq, rrv) => 
rrv.baseScore == -1 }
-          ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
-          baseId <- ratePolicy.rateBaseId.toSeq
-          basePolicy <- DefaultCounterModel.findById(baseId)
-        } yield {
-          (basePolicy, key, value.filter { case (eq, rrv) => rrv.baseScore == 
-1 })
-        }
-      }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3)))
-
-      val baseFilled = {
-        for {
-          (basePolicy, baseMissing) <- baseMissingByPolicy.toSeq
-          keyWithRelated = exactCounter.getRelatedCounts(basePolicy, 
baseMissing.map { case (k, v) => k.itemKey -> v.keys.toSeq })
-          (key, current) <- baseMissing
-        } yield {
-          val related = keyWithRelated.getOrElse(key.itemKey, 
Map.empty[ExactQualifier, Long])
-          val found = related.mapValues(l => RateRankingValue(-1, l))
-          val filled = reduceValue(RateRankingValue.reduce, 
RateRankingValue(-1, -1))(current, found)
-//          log.warn(s"base: $basePolicy $key -> $found $filled")
-          key -> filled
-        }
-      }
-
-//      baseFilled.foreach(x => println(s"base filled>> $x"))
-
-      val alreadyFilled = {
-        for {
-          (key, value) <- seq if value.exists { case (eq, rrv) => 
rrv.actionScore != -1 && rrv.baseScore != -1 }
-        } yield {
-          key -> value.filter { case (eq, rrv) => rrv.actionScore != -1 && 
rrv.baseScore != -1 }
-        }
-      }
-
-      val rtn = {
-        // filter by rate threshold
-        for {
-          (key, value) <- actionFilled ++ baseFilled ++ alreadyFilled
-          ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq
-          (eq, rrv) <- value if rrv.baseScore >= 
ratePolicy.rateThreshold.getOrElse(Int.MinValue)
-        } yield {
-          (RankingKey(key.policyId, key.version, eq), Map(key.itemKey -> 
rrv.rankingValue))
-        }
-      }
-      rtn.toIterator
-    }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), 
numPartitions)
-  }
-
-  def insertBlobValue(keys: Seq[BlobExactKey], acc: HashMapAccumulable): Unit 
= {
-    val keyByPolicy = {
-      for {
-        key <- keys
-        policy <- DefaultCounterModel.findById(key.policyId)
-      } yield {
-        (policy, key)
-      }
-    }.groupBy(_._1).mapValues(values => values.map(_._2))
-
-    for {
-      (policy, allKeys) <- keyByPolicy
-      keys <- allKeys.grouped(10)
-      success <- exactCounter.insertBlobValue(policy, keys)
-    } yield {
-      success match {
-        case true => acc += ("BLOB", 1)
-        case false => acc += ("BLOBFailed", 1)
-      }
-    }
-  }
-
-  def updateExactCounter(counts: Seq[(ExactKeyTrait, ExactValueMap)], acc: 
HashMapAccumulable): Seq[TrxLog] = {
-    val countsByPolicy = {
-      for {
-        (key, count) <- counts
-        policy <- DefaultCounterModel.findById(key.policyId)
-      } yield {
-        (policy, (key, count))
-      }
-    }.groupBy { case (policy, _) => policy }.mapValues(values => 
values.map(_._2))
-
-    for {
-      (policy, allCounts) <- countsByPolicy
-      counts <- allCounts.grouped(10)
-      trxLog <- exactCounter.updateCount(policy, counts)
-    } yield {
-      trxLog.success match {
-        case true => acc += (s"ExactV${policy.version}", 1)
-        case false => acc += (s"ExactFailedV${policy.version}", 1)
-      }
-      trxLog
-    }
-  }.toSeq
-
-  def exactCountFromEtl(rdd: RDD[CounterEtlItem], numPartitions: Int): 
RDD[(ExactKeyTrait, ExactValueMap)] = {
-    rdd.mapPartitions { part =>
-      for {
-        item <- part
-        ev <- exactMapper(item).toSeq
-      } yield {
-        ev
-      }
-    }.reduceByKey(reduceValue[ExactQualifier, Long](_ + _, 0L)(_, _), 
numPartitions)
-  }
-
-  def updateRankingCounter(values: TraversableOnce[(RankingKey, 
RankingValueMap)], acc: HashMapAccumulable): Unit = {
-    assert(initialize)
-    val valuesByPolicy = {
-      for {
-        (key, value) <- values.toSeq
-        policy <- DefaultCounterModel.findById(key.policyId)
-        if policy.useRank && rankingCounter.ready(policy) // update only rank 
counter enabled and ready
-      } yield {
-        (policy, (key, value))
-      }
-    }.groupBy { case (policy, _) => policy }.mapValues(values => 
values.map(_._2))
-
-    for {
-      (policy, allValues) <- valuesByPolicy
-      groupedValues <- allValues.grouped(10)
-    } {
-      rankingCounter.update(groupedValues, K_MAX)
-      acc += (s"RankingV${policy.version}", groupedValues.length)
-    }
-  }
-  
-  def produceTrxLog(trxLogs: TraversableOnce[TrxLog]): Unit = {
-    for {
-      trxLog <- trxLogs
-    } {
-      val topic = trxLog.success match {
-        case true => StreamingConfig.KAFKA_TOPIC_COUNTER_TRX
-        case false => StreamingConfig.KAFKA_TOPIC_COUNTER_FAIL
-      }
-      val msg = new KeyedMessage[String, String](topic, 
s"${trxLog.policyId}${trxLog.item}", Json.toJson(trxLog).toString())
-      producer.send(msg)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/core/DimensionProps.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/s2/counter/core/DimensionProps.scala 
b/s2counter_loader/src/main/scala/s2/counter/core/DimensionProps.scala
deleted file mode 100644
index 98bc750..0000000
--- a/s2counter_loader/src/main/scala/s2/counter/core/DimensionProps.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-package s2.counter.core
-
-import com.kakao.s2graph.core.mysqls.{Bucket, Experiment, Service}
-import org.apache.commons.httpclient.HttpStatus
-import org.slf4j.LoggerFactory
-import play.api.libs.json._
-import s2.config.StreamingConfig
-import s2.models.Counter
-import s2.util.{CollectionCache, CollectionCacheConfig, RetryAsync}
-
-import scala.annotation.tailrec
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-import scala.concurrent.{Await, Future}
-import scala.util.Try
-
-/**
- * Created by hsleep([email protected]) on 2015. 10. 6..
- */
-object DimensionProps {
-  // using play-ws without play app
-  private val builder = new 
com.ning.http.client.AsyncHttpClientConfig.Builder()
-  private val client = new play.api.libs.ws.ning.NingWSClient(builder.build)
-  private val log = LoggerFactory.getLogger(this.getClass)
-
-  private val retryCnt = 3
-
-  val cacheConfig = 
CollectionCacheConfig(StreamingConfig.PROFILE_CACHE_MAX_SIZE,
-    StreamingConfig.PROFILE_CACHE_TTL_SECONDS,
-    negativeCache = true,
-    3600 // negative ttl 1 hour
-  )
-  val cache: CollectionCache[Option[JsValue]] = new 
CollectionCache[Option[JsValue]](cacheConfig)
-
-  @tailrec
-  private[counter] def makeRequestBody(requestBody: String, keyValues: 
List[(String, String)]): String = {
-    keyValues match {
-      case head :: tail =>
-        makeRequestBody(requestBody.replace(head._1, head._2), tail)
-      case Nil => requestBody
-    }
-  }
-
-  private[counter] def query(bucket: Bucket, item: CounterEtlItem): 
Future[Option[JsValue]] = {
-    val keyValues = (item.dimension.as[JsObject] ++ item.property.as[JsObject] 
fields)
-      .filter { case (key, _) => key.startsWith("[[") && key.endsWith("]]") }
-      .map { case (key, jsValue) =>
-        val replacement = jsValue match {
-          case JsString(s) => s
-          case value => value.toString()
-        }
-        key -> replacement
-      }.toList
-
-    val cacheKey = s"${bucket.impressionId}=" + keyValues.flatMap(x => 
Seq(x._1, x._2)).mkString("_")
-
-    cache.withCacheAsync(cacheKey) {
-      val retryFuture = RetryAsync(retryCnt, withSleep = false) {
-        val future = bucket.httpVerb.toUpperCase match {
-          case "GET" =>
-            client.url(bucket.apiPath).get()
-          case "POST" =>
-            val newBody = makeRequestBody(bucket.requestBody, keyValues)
-            client.url(bucket.apiPath).post(Json.parse(newBody))
-        }
-
-        future.map { resp =>
-          resp.status match {
-            case HttpStatus.SC_OK =>
-              val json = Json.parse(resp.body)
-              for {
-                results <- (json \ "results").asOpt[Seq[JsValue]]
-                result <- results.headOption
-                props <- (result \ "props").asOpt[JsValue]
-              } yield {
-                props
-              }
-            case _ =>
-              log.error(s"${resp.body}(${resp.status}}) item: $item")
-              None
-          }
-        }
-      }
-
-      // if fail to retry
-      retryFuture onFailure { case t => log.error(s"${t.getMessage} item: 
$item") }
-
-      retryFuture
-    }
-  }
-
-  private[counter] def query(service: Service, experiment: Experiment, item: 
CounterEtlItem): Future[Option[JsValue]] = {
-    val keyValues = (item.dimension.as[JsObject] ++ item.property.as[JsObject] 
fields)
-      .filter { case (key, _) => key.startsWith("[[") && key.endsWith("]]") 
}.toMap
-
-    val cacheKey = s"${experiment.name}=" + keyValues.flatMap(x => Seq(x._1, 
x._2)).mkString("_")
-
-    cache.withCacheAsync(cacheKey) {
-      val retryFuture = RetryAsync(retryCnt, withSleep = false) {
-        val url = 
s"${StreamingConfig.GRAPH_URL}/graphs/experiment/${service.accessToken}/${experiment.name}/0"
-        val future = client.url(url).post(Json.toJson(keyValues))
-
-        future.map { resp =>
-          resp.status match {
-            case HttpStatus.SC_OK =>
-              val json = Json.parse(resp.body)
-              for {
-                results <- (json \ "results").asOpt[Seq[JsValue]]
-                result <- results.headOption
-                props <- (result \ "props").asOpt[JsValue]
-              } yield {
-                props
-              }
-            case _ =>
-              log.error(s"${resp.body}(${resp.status}}) item: $item")
-              None
-          }
-        }
-      }
-
-      // if fail to retry
-      retryFuture onFailure { case t => log.error(s"${t.getMessage} item: 
$item") }
-
-      retryFuture
-    }
-  }
-
-  def mergeDimension(policy: Counter, items: List[CounterEtlItem]): 
List[CounterEtlItem] = {
-    for {
-      impId <- policy.bucketImpId
-      bucket <- Bucket.findByImpressionId(impId)
-      experiment <- Experiment.findById(bucket.experimentId)
-      service <- Try { Service.findById(experiment.serviceId) }.toOption
-    } yield {
-      val futures = {
-        for {
-          item <- items
-        } yield {
-          query(service, experiment, item).map {
-            case Some(jsValue) =>
-              val newDimension = item.dimension.as[JsObject] ++ 
jsValue.as[JsObject]
-              item.copy(dimension = newDimension)
-            case None => item
-          }
-        }
-      }
-      Await.result(Future.sequence(futures), 10 seconds)
-    }
-  }.getOrElse(items)
-
-  def getCacheStatsString: String = {
-    cache.getStatsString
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/stream/EtlStreaming.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/s2/counter/stream/EtlStreaming.scala 
b/s2counter_loader/src/main/scala/s2/counter/stream/EtlStreaming.scala
deleted file mode 100644
index 03c42b5..0000000
--- a/s2counter_loader/src/main/scala/s2/counter/stream/EtlStreaming.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-package s2.counter.stream
-
-import com.kakao.s2graph.core.{Graph, GraphUtil}
-import kafka.producer.KeyedMessage
-import kafka.serializer.StringDecoder
-import org.apache.spark.streaming.Durations._
-import 
org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions
-import org.apache.spark.streaming.kafka.StreamHelper
-import s2.config.{S2ConfigFactory, S2CounterConfig, StreamingConfig}
-import s2.counter.core.{CounterEtlFunctions, CounterEtlItem, DimensionProps}
-import s2.models.{CounterModel, DBModel}
-import s2.spark.{HashMapParam, SparkApp, WithKafka}
-
-import scala.collection.mutable
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.concurrent.ExecutionContext
-
-/**
-  * Created by hsleep([email protected]) on 15. 10. 6..
-  */
-object EtlStreaming extends SparkApp with WithKafka {
-  lazy val config = S2ConfigFactory.config
-  lazy val s2Config = new S2CounterConfig(config)
-  lazy val counterModel = new CounterModel(config)
-  lazy val className = getClass.getName.stripSuffix("$")
-  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
-
-  implicit val graphEx = ExecutionContext.Implicits.global
-
-  val initialize = {
-    println("streaming initialize")
-//    Graph(config)
-    DBModel.initialize(config)
-    true
-  }
-
-  val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_ETL)
-  val strInputTopics = inputTopics.mkString(",")
-  val groupId = buildKafkaGroupId(strInputTopics, "etl_to_counter")
-  val kafkaParam = Map(
-    "group.id" -> groupId,
-    "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS,
-    "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER,
-    "zookeeper.connection.timeout.ms" -> "10000"
-  )
-  val streamHelper = StreamHelper(kafkaParam)
-
-  override def run(): Unit = {
-    validateArgument("interval")
-    val (intervalInSec) = seconds(args(0).toLong)
-
-    val conf = sparkConf(s"$strInputTopics: $className")
-    val ssc = streamingContext(conf, intervalInSec)
-    val sc = ssc.sparkContext
-
-    val acc = sc.accumulable(MutableHashMap.empty[String, Long], 
"Throughput")(HashMapParam[String, Long](_ + _))
-
-    /**
-     * read message from etl topic and join user profile from graph and then 
produce whole message to counter topic
-     */
-    val stream = streamHelper.createStream[String, String, StringDecoder, 
StringDecoder](ssc, inputTopics)
-
-    // etl logic
-    stream.foreachRDD { (rdd, ts) =>
-      rdd.foreachPartitionWithOffsetRange { case (osr, part) =>
-        assert(initialize)
-
-        // convert to edge format
-        val items = {
-          for {
-            (k, v) <- part
-            line <- GraphUtil.parseString(v)
-            item <- CounterEtlFunctions.parseEdgeFormat(line)
-          } yield {
-            acc += ("Edges", 1)
-            item
-          }
-        }
-
-        // join user profile
-        val joinItems = items.toList.groupBy { e =>
-          (e.service, e.action)
-        }.flatMap { case ((service, action), v) =>
-          CounterEtlFunctions.checkPolicyAndMergeDimension(service, action, v)
-        }
-
-        // group by kafka partition key and send to kafka
-        val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]]
-        joinItems.foreach { item =>
-          if (item.useProfile) {
-            acc += ("ETL", 1)
-          }
-          val k = getPartKey(item.item, 20)
-          val values: mutable.MutableList[CounterEtlItem] = m.getOrElse(k, 
mutable.MutableList.empty[CounterEtlItem])
-          values += item
-          m.update(k, values)
-        }
-        m.foreach { case (k, v) =>
-          v.map(_.toKafkaMessage).grouped(1000).foreach { grouped =>
-            acc += ("Produce", grouped.size)
-            producer.send(new KeyedMessage[String, 
String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n")))
-          }
-        }
-
-        streamHelper.commitConsumerOffset(osr)
-      }
-
-      if (ts.milliseconds / 1000 % 60 == 0) {
-        log.warn(DimensionProps.getCacheStatsString)
-      }
-    }
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/stream/ExactCounterStreaming.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/s2/counter/stream/ExactCounterStreaming.scala 
b/s2counter_loader/src/main/scala/s2/counter/stream/ExactCounterStreaming.scala
deleted file mode 100644
index 2b8ba21..0000000
--- 
a/s2counter_loader/src/main/scala/s2/counter/stream/ExactCounterStreaming.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-package s2.counter.stream
-
-import kafka.serializer.StringDecoder
-import org.apache.spark.streaming.Durations._
-import 
org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions
-import org.apache.spark.streaming.kafka.{HasOffsetRanges, StreamHelper}
-import s2.config.{S2ConfigFactory, S2CounterConfig, StreamingConfig}
-import s2.counter.core.CounterFunctions
-import s2.spark.{HashMapParam, SparkApp, WithKafka}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.language.postfixOps
-
-/**
- * Streaming job for counter topic
- * Created by hsleep([email protected]) on 15. 1. 15..
- */
-object ExactCounterStreaming extends SparkApp with WithKafka {
-  lazy val config = S2ConfigFactory.config
-  lazy val s2Config = new S2CounterConfig(config)
-  lazy val className = getClass.getName.stripSuffix("$")
-
-  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
-
-  val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_COUNTER)
-  val strInputTopics = inputTopics.mkString(",")
-  val groupId = buildKafkaGroupId(strInputTopics, "counter_v2")
-  val kafkaParam = Map(
-//    "auto.offset.reset" -> "smallest",
-    "group.id" -> groupId,
-    "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS,
-    "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER,
-    "zookeeper.connection.timeout.ms" -> "10000"
-  )
-  val streamHelper = StreamHelper(kafkaParam)
-
-  override def run() = {
-    validateArgument("interval", "clear")
-    val (intervalInSec, clear) = (seconds(args(0).toLong), args(1).toBoolean)
-
-    if (clear) {
-      streamHelper.kafkaHelper.consumerGroupCleanup()
-    }
-
-    val conf = sparkConf(s"$strInputTopics: $className")
-    val ssc = streamingContext(conf, intervalInSec)
-    val sc = ssc.sparkContext
-
-    implicit val acc: HashMapAccumulable = 
sc.accumulable(MutableHashMap.empty[String, Long], 
"Throughput")(HashMapParam[String, Long](_ + _))
-
-    // make stream
-    val stream = streamHelper.createStream[String, String, StringDecoder, 
StringDecoder](ssc, inputTopics)
-    stream.foreachRDD { (rdd, ts) =>
-      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-
-      val exactRDD = CounterFunctions.makeExactRdd(rdd, offsets.length)
-
-      // for at-least once semantic
-      exactRDD.foreachPartitionWithIndex { (i, part) =>
-        // update exact counter
-        val trxLogs = CounterFunctions.updateExactCounter(part.toSeq, acc)
-        CounterFunctions.produceTrxLog(trxLogs)
-
-        // commit offset range
-        streamHelper.commitConsumerOffset(offsets(i))
-      }
-    }
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/stream/GraphToETLStreaming.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/s2/counter/stream/GraphToETLStreaming.scala 
b/s2counter_loader/src/main/scala/s2/counter/stream/GraphToETLStreaming.scala
deleted file mode 100644
index 39654d3..0000000
--- 
a/s2counter_loader/src/main/scala/s2/counter/stream/GraphToETLStreaming.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package s2.counter.stream
-
-import com.kakao.s2graph.core.GraphUtil
-import kafka.producer.KeyedMessage
-import kafka.serializer.StringDecoder
-import org.apache.spark.streaming.Durations._
-import 
org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions
-import s2.config.{S2ConfigFactory, S2CounterConfig, StreamingConfig}
-import s2.spark.{HashMapParam, SparkApp, WithKafka}
-
-import scala.collection.mutable
-import scala.collection.mutable.{HashMap => MutableHashMap}
-
-/**
- * can be @deprecated
- * Created by hsleep([email protected]) on 15. 3. 16..
- */
-object GraphToETLStreaming extends SparkApp with WithKafka {
-  lazy val config = S2ConfigFactory.config
-  lazy val s2Config = new S2CounterConfig(config)
-  lazy val className = getClass.getName.stripSuffix("$")
-  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
-
-  override def run(): Unit = {
-    validateArgument("interval", "topic")
-    val (intervalInSec, topic) = (seconds(args(0).toLong), args(1))
-
-    val groupId = buildKafkaGroupId(topic, "graph_to_etl")
-    val kafkaParam = Map(
-//      "auto.offset.reset" -> "smallest",
-      "group.id" -> groupId,
-      "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS,
-      "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER,
-      "zookeeper.connection.timeout.ms" -> "10000"
-    )
-
-    val conf = sparkConf(s"$topic: $className")
-    val ssc = streamingContext(conf, intervalInSec)
-    val sc = ssc.sparkContext
-
-    val acc = sc.accumulable(MutableHashMap.empty[String, Long], 
"Throughput")(HashMapParam[String, Long](_ + _))
-
-    /**
-     * consume graphIn topic and produce messages to etl topic
-     * two purpose
-     * 1. partition by target vertex id
-     * 2. expand kafka partition count
-     */
-    val stream = getStreamHelper(kafkaParam).createStream[String, String, 
StringDecoder, StringDecoder](ssc, topic.split(',').toSet)
-    stream.foreachRDD { rdd =>
-      rdd.foreachPartitionWithOffsetRange { case (osr, part) =>
-        val m = MutableHashMap.empty[Int, mutable.MutableList[String]]
-        for {
-          (k, v) <- part
-          line <- GraphUtil.parseString(v)
-        } {
-          try {
-            val sp = GraphUtil.split(line)
-            // get partition key by target vertex id
-            val partKey = getPartKey(sp(4), 20)
-            val values = m.getOrElse(partKey, 
mutable.MutableList.empty[String])
-            values += line
-            m.update(partKey, values)
-          } catch {
-            case ex: Throwable =>
-              log.error(s"$ex: $line")
-          }
-        }
-
-        m.foreach { case (k, v) =>
-          v.grouped(1000).foreach { grouped =>
-            producer.send(new KeyedMessage[String, 
String](StreamingConfig.KAFKA_TOPIC_ETL, null, k, grouped.mkString("\n")))
-          }
-        }
-
-        getStreamHelper(kafkaParam).commitConsumerOffset(osr)
-      }
-    }
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/stream/RankingCounterStreaming.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/s2/counter/stream/RankingCounterStreaming.scala
 
b/s2counter_loader/src/main/scala/s2/counter/stream/RankingCounterStreaming.scala
deleted file mode 100644
index 4c0b927..0000000
--- 
a/s2counter_loader/src/main/scala/s2/counter/stream/RankingCounterStreaming.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-package s2.counter.stream
-
-import kafka.serializer.StringDecoder
-import org.apache.spark.streaming.Durations._
-import 
org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions
-import org.apache.spark.streaming.kafka.{HasOffsetRanges, StreamHelper}
-import s2.config.{S2ConfigFactory, S2CounterConfig, StreamingConfig}
-import s2.counter.core.CounterFunctions
-import s2.spark.{HashMapParam, SparkApp, WithKafka}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 19..
- */
-object RankingCounterStreaming extends SparkApp with WithKafka {
-  lazy val config = S2ConfigFactory.config
-  lazy val s2Config = new S2CounterConfig(config)
-  lazy val className = getClass.getName.stripSuffix("$")
-
-  lazy val producer = getProducer[String, 
String](StreamingConfig.KAFKA_BROKERS)
-
-  val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_COUNTER_TRX)
-  val strInputTopics = inputTopics.mkString(",")
-  val groupId = buildKafkaGroupId(strInputTopics, "ranking_v2")
-  val kafkaParam = Map(
-//    "auto.offset.reset" -> "smallest",
-    "group.id" -> groupId,
-    "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS,
-    "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER,
-    "zookeeper.connection.timeout.ms" -> "10000"
-  )
-  val streamHelper = StreamHelper(kafkaParam)
-
-  override def run() = {
-    validateArgument("interval", "clear")
-    val (intervalInSec, clear) = (seconds(args(0).toLong), args(1).toBoolean)
-
-    if (clear) {
-      streamHelper.kafkaHelper.consumerGroupCleanup()
-    }
-
-    val conf = sparkConf(s"$strInputTopics: $className")
-    val ssc = streamingContext(conf, intervalInSec)
-    val sc = ssc.sparkContext
-
-    implicit val acc: HashMapAccumulable = 
sc.accumulable(MutableHashMap.empty[String, Long], 
"Throughput")(HashMapParam[String, Long](_ + _))
-
-    // make stream
-    val stream = streamHelper.createStream[String, String, StringDecoder, 
StringDecoder](ssc, inputTopics)
-    stream.foreachRDD { (rdd, ts) =>
-      // for at-least once semantic
-      val nextRdd = {
-        CounterFunctions.makeRankingRdd(rdd, 
sc.defaultParallelism).foreachPartition { part =>
-          // update ranking counter
-          CounterFunctions.updateRankingCounter(part, acc)
-        }
-        rdd
-      }
-
-      streamHelper.commitConsumerOffsets(nextRdd.asInstanceOf[HasOffsetRanges])
-//      CounterFunctions.makeRankingRdd(rdd, 
offsets.length).foreachPartitionWithIndex { (i, part) =>
-//        // update ranking counter
-//        CounterFunctions.updateRankingCounter(part, acc)
-//
-//        // commit offset range
-//        streamHelper.commitConsumerOffset(offsets(i))
-//      }
-    }
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/models/DefaultCounterModel.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/s2/models/DefaultCounterModel.scala 
b/s2counter_loader/src/main/scala/s2/models/DefaultCounterModel.scala
deleted file mode 100644
index 9cbe212..0000000
--- a/s2counter_loader/src/main/scala/s2/models/DefaultCounterModel.scala
+++ /dev/null
@@ -1,8 +0,0 @@
-package s2.models
-
-import s2.config.S2ConfigFactory
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 8..
- */
-case object DefaultCounterModel extends CounterModel(S2ConfigFactory.config)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
new file mode 100644
index 0000000..14e335e
--- /dev/null
+++ 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
@@ -0,0 +1,33 @@
+package org.apache.s2graph.counter.loader.core
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{Matchers, BeforeAndAfterAll, FlatSpec}
+import s2.models.DBModel
+
+/**
+ * Created by hsleep([email protected]) on 15. 7. 3..
+ */
+class CounterEtlFunctionsSpec extends FlatSpec with BeforeAndAfterAll with 
Matchers {
+  override def beforeAll: Unit = {
+    DBModel.initialize(ConfigFactory.load())
+  }
+
+  "CounterEtlFunctions" should "parsing log" in {
+    val data =
+      """
+        |1435107139287 insert  e       aaPHfITGUU0B_150212123559509    abcd    
test_case       {"cateid":"100110102","shopid":"1","brandid":""}
+        |1435106916136 insert  e       Tgc00-wtjp2B_140918153515441    efgh    
test_case       {"cateid":"101104107","shopid":"2","brandid":""}
+      """.stripMargin.trim.split('\n')
+    val items = {
+      for {
+        line <- data
+        item <- CounterEtlFunctions.parseEdgeFormat(line)
+      } yield {
+        item.action should equal("test_case")
+        item
+      }
+    }
+
+    items should have size 2
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
new file mode 100644
index 0000000..c0f1db7
--- /dev/null
+++ 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
@@ -0,0 +1,46 @@
+package org.apache.s2graph.counter.loader.core
+
+import org.scalatest.{FunSuite, Matchers}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Created by hsleep([email protected]) on 2015. 10. 7..
+ */
+class DimensionPropsTest extends FunSuite with Matchers {
+  test("makeRequestBody with Seq") {
+    val requestBody =
+      """
+        |{
+        |  "_from" => [[_from]]
+        |}
+      """.stripMargin
+    val requestBodyExpected =
+      """
+        |{
+        |  "_from" => 1
+        |}
+      """.stripMargin
+    val requestBodyResult = DimensionProps.makeRequestBody(requestBody, 
Seq(("[[_from]]", "1")).toList)
+
+    requestBodyResult shouldEqual requestBodyExpected
+  }
+
+  test("makeRequestBody with ListBuffer") {
+    val requestBody =
+      """
+        |{
+        |  "_from" => [[_from]]
+        |}
+      """.stripMargin
+    val requestBodyExpected =
+      """
+        |{
+        |  "_from" => 1
+        |}
+      """.stripMargin
+    val requestBodyResult = DimensionProps.makeRequestBody(requestBody, 
ListBuffer(("[[_from]]", "1")).toList)
+
+    requestBodyResult shouldEqual requestBodyExpected
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
new file mode 100644
index 0000000..b716f86
--- /dev/null
+++ 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
@@ -0,0 +1,198 @@
+package org.apache.s2graph.counter.loader.stream
+
+import org.apache.s2graph.core.{Management, GraphUtil}
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.counter.loader.counter.core.{CounterEtlItem, 
CounterFunctions}
+import org.apache.s2graph.counter.loader.models.DefaultCounterModel
+import org.apache.s2graph.spark.config.S2ConfigFactory
+import org.apache.s2graph.spark.spark.HashMapParam
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import play.api.libs.json.Json
+import CounterFunctions.HashMapAccumulable
+import s2.counter.core.TimedQualifier.IntervalUnit
+import s2.counter.core._
+import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, 
RankingStorageGraph}
+import s2.helper.CounterAdmin
+import s2.models.{Counter, DBModel}
+
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.util.{Failure, Success}
+
+/**
+  * Created by hsleep([email protected]) on 2015. 11. 19..
+  */
+class ExactCounterStreamingSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
+  private val master = "local[2]"
+  private val appName = "exact_counter_streaming"
+  private val batchDuration = Seconds(1)
+
+  private var sc: SparkContext = _
+  private var ssc: StreamingContext = _
+
+  val admin = new CounterAdmin(S2ConfigFactory.config)
+  val graphOp = new GraphOperation(S2ConfigFactory.config)
+  val s2config = new S2CounterConfig(S2ConfigFactory.config)
+
+  val exactCounter = new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
+  val rankingCounter = new RankingCounter(S2ConfigFactory.config, new 
RankingStorageGraph(S2ConfigFactory.config))
+
+  val service = "test"
+  val action = "test_case"
+
+  override def beforeAll(): Unit = {
+    DBModel.initialize(S2ConfigFactory.config)
+
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName(appName)
+
+    ssc = new StreamingContext(conf, batchDuration)
+
+    sc = ssc.sparkContext
+
+    // create test_case label
+    Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, 
s"${service}_dev", 1, None, "gz")
+    if (Label.findByName(action, useCache = false).isEmpty) {
+      val strJs =
+        s"""
+           |{
+           |  "label": "$action",
+           |  "srcServiceName": "$service",
+           |  "srcColumnName": "src",
+           |  "srcColumnType": "string",
+           |  "tgtServiceName": "$service",
+           |  "tgtColumnName": "$action",
+           |  "tgtColumnType": "string",
+           |  "indices": [
+           |  ],
+           |  "props": [
+           |  ]
+           |}
+       """.stripMargin
+      graphOp.createLabel(Json.parse(strJs))
+    }
+
+    // action
+    admin.deleteCounter(service, action).foreach {
+      case Success(v) =>
+      case Failure(ex) =>
+        println(s"$ex")
+    }
+    admin.createCounter(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING, autoComb = true, "is_shared,relationship", useRank = 
true))
+  }
+
+  override def afterAll(): Unit = {
+    admin.deleteCounter(service, action)
+    if (ssc != null) {
+      ssc.stop()
+    }
+  }
+
+  "ExactCounter" should "update" in {
+    val policy = DefaultCounterModel.findByServiceAction(service, action).get
+    val data =
+      s"""
+        |1434534565675 $service        $action 70362200_94013572857366866      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48255079","userIdType":"profile_id","value":"1"}
+        |1434534565675 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48255079","userIdType":"profile_id","value":"1"}
+        |1434534566220 $service        $action 51223360_94013140590929619      
{"is_shared":"false","relationship":"FE"}       
{"userId":"312383","userIdType":"profile_id","value":"1"}
+        |1434534566508 $service        $action 63808459_94013420047377826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"21968241","userIdType":"profile_id","value":"1"}
+        |1434534566210 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"6062217","userIdType":"profile_id","value":"1"}
+        |1434534566459 $service        $action 49699692_94012186431261763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"67863471","userIdType":"profile_id","value":"1"}
+        |1434534565681 $service        $action 64556827_94012311028641810      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19381218","userIdType":"profile_id","value":"1"}
+        |1434534565865 $service        $action 41814266_94012477588942163      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19268547","userIdType":"profile_id","value":"1"}
+        |1434534565865 $service        $action 66697741_94007840665633458      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19268547","userIdType":"profile_id","value":"1"}
+        |1434534566142 $service        $action 66444074_94012737377133826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"11917195","userIdType":"profile_id","value":"1"}
+        |1434534566077 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"37709890","userIdType":"profile_id","value":"1"}
+        |1434534565938 $service        $action 40921487_94012905738975266      
{"is_shared":"false","relationship":"FE"}       
{"userId":"59869223","userIdType":"profile_id","value":"1"}
+        |1434534566033 $service        $action 64506628_93994707216829506      
{"is_shared":"false","relationship":"FE"}       
{"userId":"50375575","userIdType":"profile_id","value":"1"}
+        |1434534566451 $service        $action 40748868_94013448321919139      
{"is_shared":"false","relationship":"FE"}       
{"userId":"12249539","userIdType":"profile_id","value":"1"}
+        |1434534566669 $service        $action 64499956_94013227717457106      
{"is_shared":"false","relationship":"FE"}       
{"userId":"25167419","userIdType":"profile_id","value":"1"}
+        |1434534566669 $service        $action 66444074_94012737377133826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"25167419","userIdType":"profile_id","value":"1"}
+        |1434534566318 $service        $action 64774665_94012837889027027      
{"is_shared":"true","relationship":"F"} 
{"userId":"71557816","userIdType":"profile_id","value":"1"}
+        |1434534566274 $service        $action 67075480_94008509166933763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"57931860","userIdType":"profile_id","value":"1"}
+        |1434534566659 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19990823","userIdType":"profile_id","value":"1"}
+        |1434534566250 $service        $action 70670053_93719933175630611      
{"is_shared":"true","relationship":"F"} 
{"userId":"68897412","userIdType":"profile_id","value":"1"}
+        |1434534566402 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"15541439","userIdType":"profile_id","value":"1"}
+        |1434534566122 $service        $action 48890741_94013463616012786      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48040409","userIdType":"profile_id","value":"1"}
+        |1434534566055 $service        $action 64509008_94002318232678546      
{"is_shared":"true","relationship":"F"} 
{"userId":"46532039","userIdType":"profile_id","value":"1"}
+        |1434534565994 $service        $action 66644368_94009163363033795      
{"is_shared":"false","relationship":"FE"}       
{"userId":"4143147","userIdType":"profile_id","value":"1"}
+        |1434534566448 $service        $action 64587644_93938555963733954      
{"is_shared":"false","relationship":"FE"}       
{"userId":"689042","userIdType":"profile_id","value":"1"}
+        |1434534565935 $service        $action 52812511_94012009551561315      
{"is_shared":"false","relationship":"FE"}       
{"userId":"35509692","userIdType":"profile_id","value":"1"}
+        |1434534566544 $service        $action 70452048_94008573197583762      
{"is_shared":"false","relationship":"FE"}       
{"userId":"5172421","userIdType":"profile_id","value":"1"}
+        |1434534565929 $service        $action 54547023_94013384964278435      
{"is_shared":"false","relationship":"FE"}       
{"userId":"33556498","userIdType":"profile_id","value":"1"}
+        |1434534566358 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"8987346","userIdType":"profile_id","value":"1"}
+        |1434534566057 $service        $action 67075480_94008509166933763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"35134964","userIdType":"profile_id","value":"1"}
+        |1434534566140 $service        $action 54547023_94013384964278435      
{"is_shared":"false","relationship":"FE"}       
{"userId":"11900315","userIdType":"profile_id","value":"1"}
+        |1434534566158 $service        $action 64639374_93888330176053635      
{"is_shared":"true","relationship":"F"} 
{"userId":"49996643","userIdType":"profile_id","value":"1"}
+        |1434534566025 $service        $action 67265128_94009084771192002      
{"is_shared":"false","relationship":"FE"}       
{"userId":"37801480","userIdType":"profile_id","value":"1"}
+      """.stripMargin.trim
+    //    println(data)
+    val rdd = sc.parallelize(Seq(("", data)))
+
+    //    rdd.foreachPartition { part =>
+    //      part.foreach(println)
+    //    }
+    val resultRdd = CounterFunctions.makeExactRdd(rdd, 2)
+    val result = resultRdd.collect().toMap
+
+    //    result.foreachPartition { part =>
+    //      part.foreach(println)
+    //    }
+
+    val parsed = {
+      for {
+        line <- GraphUtil.parseString(data)
+        item <- CounterEtlItem(line).toSeq
+        ev <- CounterFunctions.exactMapper(item).toSeq
+      } yield {
+        ev
+      }
+    }
+    val parsedResult = parsed.groupBy(_._1).mapValues(values => 
values.map(_._2).reduce(CounterFunctions.reduceValue[ExactQualifier, Long](_ + 
_, 0L)))
+
+    //    parsedResult.foreach { case (k, v) =>
+    //      println(k, v)
+    //    }
+
+    result should not be empty
+    result should equal (parsedResult)
+
+    val itemId = "46889329_94013502934177075"
+    val key = ExactKey(DefaultCounterModel.findByServiceAction(service, 
action).get, itemId, checkItemType = true)
+    val value = result.get(key)
+
+    value should not be empty
+    value.get.get(ExactQualifier(TimedQualifier("t", 0), Map.empty[String, 
String])) should equal (Some(6L))
+
+    exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map.empty[String, Set[String]]) should be (None)
+
+    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
+    resultRdd.foreachPartition { part =>
+      CounterFunctions.updateExactCounter(part.toSeq, acc)
+    }
+
+    Option(FetchedCountsGrouped(key, Map(
+      (IntervalUnit.TOTAL, Map.empty[String, String]) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "") -> 6l)
+    ))).foreach { expected =>
+      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map.empty[String, Set[String]]) should be (Some(expected))
+    }
+    Option(FetchedCountsGrouped(key, Map(
+      (IntervalUnit.TOTAL, Map("is_shared" -> "false")) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.false") -> 6l)
+    ))).foreach { expected =>
+      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("is_shared" -> Set("false"))) should be (Some(expected))
+    }
+    Option(FetchedCountsGrouped(key, Map(
+      (IntervalUnit.TOTAL, Map("relationship" -> "FE")) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "relationship.FE") -> 6l)
+    ))).foreach { expected =>
+      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("relationship" -> Set("FE"))) should be (Some(expected))
+    }
+    Option(FetchedCountsGrouped(key, Map(
+      (IntervalUnit.TOTAL, Map("is_shared" -> "false", "relationship" -> 
"FE")) -> Map(ExactQualifier(TimedQualifier("t", 0), 
"is_shared.relationship.false.FE") -> 6l)
+    ))).foreach { expected =>
+      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("is_shared" -> Set("false"), "relationship" -> Set("FE"))) should be 
(Some(expected))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
new file mode 100644
index 0000000..2a913e5
--- /dev/null
+++ 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
@@ -0,0 +1,451 @@
+package org.apache.s2graph.counter.loader.stream
+
+import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.counter.loader.counter.core.CounterFunctions
+import org.apache.s2graph.counter.loader.models.DefaultCounterModel
+import org.apache.s2graph.spark.config.S2ConfigFactory
+import org.apache.s2graph.spark.spark.HashMapParam
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import play.api.libs.json.Json
+import CounterFunctions.HashMapAccumulable
+import s2.counter.core.TimedQualifier.IntervalUnit
+import s2.counter.core._
+import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, 
RankingStorageGraph}
+import s2.helper.CounterAdmin
+import s2.models.{Counter, DBModel}
+
+import scala.collection.mutable.{HashMap => MutableHashMap}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.util.{Failure, Success}
+
+/**
+ * Created by hsleep([email protected]) on 15. 6. 17..
+ */
+class RankingCounterStreamingSpec extends FlatSpec with BeforeAndAfterAll with 
Matchers {
+  private val master = "local[2]"
+  private val appName = "ranking_counter_streaming"
+  private val batchDuration = Seconds(1)
+
+  private var sc: SparkContext = _
+  private var ssc: StreamingContext = _
+
+  val admin = new CounterAdmin(S2ConfigFactory.config)
+  val graphOp = new GraphOperation(S2ConfigFactory.config)
+  val s2config = new S2CounterConfig(S2ConfigFactory.config)
+
+  val exactCounter = new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
+  val rankingCounter = new RankingCounter(S2ConfigFactory.config, new 
RankingStorageGraph(S2ConfigFactory.config))
+
+  val service = "test"
+  val action = "test_case"
+  val action_base = "test_case_base"
+  val action_rate = "test_case_rate"
+  val action_rate_threshold = "test_case_rate_threshold"
+  val action_trend = "test_case_trend"
+
+  override def beforeAll(): Unit = {
+    DBModel.initialize(S2ConfigFactory.config)
+
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName(appName)
+
+    ssc = new StreamingContext(conf, batchDuration)
+
+    sc = ssc.sparkContext
+
+    admin.setupCounterOnGraph()
+
+    // create test_case label
+    Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, 
s"${service}_dev", 1, None, "gz")
+    if (Label.findByName(action, useCache = false).isEmpty) {
+      val strJs =
+        s"""
+           |{
+           |  "label": "$action",
+           |  "srcServiceName": "$service",
+           |  "srcColumnName": "src",
+           |  "srcColumnType": "string",
+           |  "tgtServiceName": "$service",
+           |  "tgtColumnName": "$action",
+           |  "tgtColumnType": "string",
+           |  "indices": [
+           |  ],
+           |  "props": [
+           |  ]
+           |}
+       """.stripMargin
+      graphOp.createLabel(Json.parse(strJs))
+    }
+    if (Label.findByName(action_base, useCache = false).isEmpty) {
+      val strJs =
+        s"""
+           |{
+           |  "label": "$action_base",
+           |  "srcServiceName": "$service",
+           |  "srcColumnName": "src",
+           |  "srcColumnType": "string",
+           |  "tgtServiceName": "$service",
+           |  "tgtColumnName": "$action",
+           |  "tgtColumnType": "string",
+           |  "indices": [
+           |  ],
+           |  "props": [
+           |  ]
+           |}
+       """.stripMargin
+      graphOp.createLabel(Json.parse(strJs))
+    }
+
+    // action
+    admin.deleteCounter(service, action).foreach {
+      case Success(v) =>
+      case Failure(ex) =>
+        println(s"$ex")
+    }
+    admin.createCounter(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING, autoComb = true, "", useRank = true))
+    val policy = DefaultCounterModel.findByServiceAction(service, action).get
+
+    // action_base
+    admin.deleteCounter(service, action_base).foreach {
+      case Success(v) =>
+      case Failure(ex) =>
+        println(s"$ex")
+    }
+    admin.createCounter(Counter(useFlag = true, 2, service, action_base, 
Counter.ItemType.STRING, autoComb = true, "", useRank = true))
+    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
+
+    // action_rate
+    admin.deleteCounter(service, action_rate).foreach {
+      case Success(v) =>
+      case Failure(ex) =>
+        println(s"$ex")
+    }
+    admin.createCounter(Counter(useFlag = true, 2, service, action_rate, 
Counter.ItemType.STRING, autoComb = true, "gender,p1", useRank = true,
+      rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id)))
+
+    // action_rate_threshold
+    admin.deleteCounter(service, action_rate_threshold).foreach {
+      case Success(v) =>
+      case Failure(ex) =>
+        println(s"$ex")
+    }
+    admin.createCounter(Counter(useFlag = true, 2, service, 
action_rate_threshold, Counter.ItemType.STRING, autoComb = true, "gender,p1", 
useRank = true,
+      rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id), 
rateThreshold = Some(3)))
+
+    // action_trend
+    admin.deleteCounter(service, action_trend).foreach {
+      case Success(v) =>
+      case Failure(ex) =>
+        println(s"$ex")
+    }
+    admin.createCounter(Counter(useFlag = true, 2, service, action_trend, 
Counter.ItemType.STRING, autoComb = true, "p1", useRank = true,
+      rateActionId = Some(policy.id), rateBaseId = Some(policy.id)))
+ }
+
+  override def afterAll(): Unit = {
+    admin.deleteCounter(service, action)
+    admin.deleteCounter(service, action_base)
+    admin.deleteCounter(service, action_rate)
+    admin.deleteCounter(service, action_rate_threshold)
+    admin.deleteCounter(service, action_trend)
+    if (ssc != null) {
+      ssc.stop()
+    }
+  }
+
+  "RankingCounterStreaming" should "update" in {
+    val policy = DefaultCounterModel.findByServiceAction(service, action).get
+//    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
+
+    rankingCounter.ready(policy) should equal (true)
+    val data =
+      s"""
+         
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":3}]}
+         
|{"success":false,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
+         
|{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
+         
|{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
+         
|{"success":true,"policyId":${policy.id},"item":"4","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
+      """.stripMargin.trim
+    //    println(data)
+    val rdd = sc.parallelize(Seq(("", data)))
+
+    //    rdd.foreachPartition { part =>
+    //      part.foreach(println)
+    //    }
+
+    val result = CounterFunctions.makeRankingRdd(rdd, 2).collect().toMap
+
+    //    result.foreachPartition { part =>
+    //      part.foreach(println)
+    //    }
+
+    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
+
+    result should not be empty
+    val rankKey = RankingKey(policy.id, policy.version, 
ExactQualifier(TimedQualifier("M", 1433084400000L), ""))
+    result should contain (rankKey -> Map(
+      "1" -> RankingValue(3, 1),
+      "3" -> RankingValue(2, 2),
+      "4" -> RankingValue(1, 1)
+    ))
+
+    val key = RankingKey(policy.id, policy.version, 
ExactQualifier(TimedQualifier("M", 1433084400000L), ""))
+    val value = result.get(key)
+
+    value should not be empty
+    value.get.get("1").get should equal (RankingValue(3, 1))
+    value.get.get("2") shouldBe empty
+    value.get.get("3").get should equal (RankingValue(2, 2))
+
+    rankingCounter.ready(policy) should equal (true)
+
+    // delete, update and get
+    rankingCounter.delete(key)
+    Thread.sleep(1000)
+    CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
+    Thread.sleep(1000)
+    val rst = rankingCounter.getTopK(key)
+
+    rst should not be empty
+//    rst.get.totalScore should equal(4f)
+    rst.get.values should contain allOf(("3", 2d), ("4", 1d), ("1", 3d))
+  }
+
+//  "rate by base" >> {
+//    val data =
+//      """
+//        
|{"success":true,"policyId":42,"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
+//      """.stripMargin.trim
+//    val rdd = sc.parallelize(Seq(("", data)))
+//
+//    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2).collect()
+//    trxLogRdd.foreach { log =>
+//      CounterFunctions.rateBaseRankingMapper(log) must not be empty
+//    }
+//
+//    true must_== true
+//  }
+
+  it should "update rate ranking counter" in {
+    val policy = DefaultCounterModel.findByServiceAction(service, action).get
+    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
+    val ratePolicy = DefaultCounterModel.findByServiceAction(service, 
action_rate).get
+
+    // update base policy
+    val eq = ExactQualifier(TimedQualifier("M", 1433084400000l), "")
+    val exactKey = ExactKey(basePolicy, "1", checkItemType = true)
+
+    // check base item count
+    exactCounter.updateCount(basePolicy, Seq(
+      (exactKey, Map(eq -> 2l))
+    ))
+    Thread.sleep(1000)
+
+    // direct get
+    val baseCount = exactCounter.getCount(basePolicy, "1", 
Seq(IntervalUnit.MONTHLY), 1433084400000l, 1433084400000l, Map.empty[String, 
Set[String]])
+    baseCount should not be empty
+    baseCount.get should equal (FetchedCountsGrouped(exactKey, Map(
+      (eq.tq.q, Map.empty[String, String]) -> Map(eq-> 2l)
+    )))
+
+    // related get
+    val relatedCount = exactCounter.getRelatedCounts(basePolicy, Seq("1" -> 
Seq(eq)))
+    relatedCount should not be empty
+    relatedCount.get("1") should not be empty
+    relatedCount.get("1").get should equal (Map(eq -> 2l))
+
+    val data =
+      s"""
+        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
+        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]}
+        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
+        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]}
+        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":1,"result":1}]}
+        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
+        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":2,"result":4}]}
+        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
+      """.stripMargin.trim
+    //    println(data)
+    val rdd = sc.parallelize(Seq(("", data)))
+
+    //    rdd.foreachPartition { part =>
+    //      part.foreach(println)
+    //    }
+
+    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
+    trxLogRdd.count() should equal (data.trim.split('\n').length)
+
+    val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
+    itemRankingRdd.foreach(println)
+
+    val result = CounterFunctions.rateRankingCount(itemRankingRdd, 
2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id)
+    result.foreach(println)
+    result should have size 3
+
+    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
+
+    // rate ranking
+    val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1433084400000L), ""))
+    val value = result.get(key)
+
+//    println(key, value)
+
+    value should not be empty
+    value.get.get("1") should not be empty
+    value.get.get("1").get should equal (RankingValue(1, 0))
+    value.get.get("2").get should equal (RankingValue(0.25, 0))
+
+    val key2 = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1433084400000L), "p1.1"))
+    val value2 = result.get(key2)
+
+//    println(key2, value2)
+
+    val values = value.map(v => (key, v)).toSeq ++ value2.map(v => (key2, 
v)).toSeq
+    println(s"values: $values")
+
+    // delete, update and get
+    rankingCounter.delete(key)
+    rankingCounter.delete(key2)
+    Thread.sleep(1000)
+    CounterFunctions.updateRankingCounter(values, acc)
+    // for update graph
+    Thread.sleep(1000)
+
+    val rst = rankingCounter.getTopK(key)
+    rst should not be empty
+    rst.get.values should equal (Seq(("1", 1d), ("2", 0.25d)))
+
+    val rst2 = rankingCounter.getTopK(key2)
+    rst2 should not be empty
+    rst2.get.values should equal (Seq(("2", 0.25d)))
+  }
+
+  it should "update rate ranking counter with threshold" in {
+    val policy = DefaultCounterModel.findByServiceAction(service, action).get
+    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
+    val ratePolicy = DefaultCounterModel.findByServiceAction(service, 
action_rate_threshold).get
+
+    val data =
+      s"""
+        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
+        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
+        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
+        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]}
+        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
+        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]}
+      """.stripMargin.trim
+    //    println(data)
+    val rdd = sc.parallelize(Seq(("", data)))
+
+    //    rdd.foreachPartition { part =>
+    //      part.foreach(println)
+    //    }
+
+    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
+    trxLogRdd.count() should equal (data.trim.split('\n').length)
+
+    val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
+    itemRankingRdd.foreach(println)
+
+    val result = CounterFunctions.rateRankingCount(itemRankingRdd, 
2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id)
+    result.foreach(println)
+    result should have size 2
+
+    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
+
+    // rate ranking
+    val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1433084400000L), ""))
+    val value = result.get(key)
+
+    value should not be empty
+    value.get.get("1") should be (None)
+    value.get.get("2").get should equal (RankingValue(0.25, 0))
+
+    // delete, update and get
+    rankingCounter.delete(key)
+    Thread.sleep(1000)
+    CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
+    Thread.sleep(1000)
+    val rst = rankingCounter.getTopK(key)
+
+    rst should not be empty
+    rst.get.values should equal (Seq(("2", 0.25d)))
+  }
+
+  it should "update trend ranking counter" in {
+    val policy = DefaultCounterModel.findByServiceAction(service, action).get
+    val trendPolicy = DefaultCounterModel.findByServiceAction(service, 
action_trend).get
+
+    val exactKey1 = ExactKey(policy, "1", checkItemType = true)
+    val exactKey2 = ExactKey(policy, "2", checkItemType = true)
+    // update old key value
+    val tq1 = TimedQualifier("M", 1435676400000l)
+    val tq2 = TimedQualifier("M", 1427814000000l)
+    exactCounter.updateCount(policy, Seq(
+      exactKey1 -> Map(ExactQualifier(tq1.add(-1), "") -> 1l, 
ExactQualifier(tq2.add(-1), "") -> 92l)
+    ))
+    val eq1 = ExactQualifier(tq1, "")
+    val eq2 = ExactQualifier(tq2, "")
+
+    val oldCount = exactCounter.getPastCounts(policy, Seq("1" -> Seq(eq1, 
eq2), "2" -> Seq(eq1, eq1.copy(dimension = "gender.M"))))
+    oldCount should not be empty
+    oldCount.get("1").get should equal(Map(eq1 -> 1l, eq2 -> 92l))
+    oldCount.get("2") should be (None)
+
+    val data =
+      s"""
+        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]}
+        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":2}]}
+        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]}
+        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1435676400000,"value":1,"result":1}]}
+        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1427814000000,"value":1,"result":92}]}
+      """.stripMargin.trim
+    //    println(data)
+    val rdd = sc.parallelize(Seq(("", data)))
+
+    //    rdd.foreachPartition { part =>
+    //      part.foreach(println)
+    //    }
+
+    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
+    trxLogRdd.count() should equal (data.trim.split('\n').length)
+
+    val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
+    itemRankingRdd.foreach(println)
+
+    val result = CounterFunctions.trendRankingCount(itemRankingRdd, 
2).collect().toMap
+    result.foreach(println)
+    // dimension gender.M is ignored, because gender is not defined dimension 
in trend policy.
+    result should have size 2
+
+    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
+
+    // trend ranking
+    val key = RankingKey(trendPolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1435676400000L), ""))
+    val value = result.get(key)
+
+    value should not be empty
+    value.get.get("1").get should equal (RankingValue(2, 0))
+    value.get.get("2").get should equal (RankingValue(1, 0))
+
+    val key2 = RankingKey(trendPolicy.id, 2, 
ExactQualifier(TimedQualifier("M", 1427814000000L), ""))
+    val value2 = result.get(key2)
+
+    value2 should not be empty
+    value2.get.get("1").get should equal (RankingValue(1, 0))
+
+    // delete, update and get
+    rankingCounter.delete(key)
+    Thread.sleep(1000)
+    CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
+    Thread.sleep(1000)
+    val rst = rankingCounter.getTopK(key)
+
+    rst should not be empty
+    rst.get.values should equal (Seq("1" -> 2, "2" -> 1))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/s2/counter/core/CounterEtlFunctionsSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/s2/counter/core/CounterEtlFunctionsSpec.scala 
b/s2counter_loader/src/test/scala/s2/counter/core/CounterEtlFunctionsSpec.scala
deleted file mode 100644
index 520b30f..0000000
--- 
a/s2counter_loader/src/test/scala/s2/counter/core/CounterEtlFunctionsSpec.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package s2.counter.core
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{Matchers, BeforeAndAfterAll, FlatSpec}
-import s2.models.DBModel
-
-/**
- * Created by hsleep([email protected]) on 15. 7. 3..
- */
-class CounterEtlFunctionsSpec extends FlatSpec with BeforeAndAfterAll with 
Matchers {
-  override def beforeAll: Unit = {
-    DBModel.initialize(ConfigFactory.load())
-  }
-
-  "CounterEtlFunctions" should "parsing log" in {
-    val data =
-      """
-        |1435107139287 insert  e       aaPHfITGUU0B_150212123559509    abcd    
test_case       {"cateid":"100110102","shopid":"1","brandid":""}
-        |1435106916136 insert  e       Tgc00-wtjp2B_140918153515441    efgh    
test_case       {"cateid":"101104107","shopid":"2","brandid":""}
-      """.stripMargin.trim.split('\n')
-    val items = {
-      for {
-        line <- data
-        item <- CounterEtlFunctions.parseEdgeFormat(line)
-      } yield {
-        item.action should equal("test_case")
-        item
-      }
-    }
-
-    items should have size 2
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/s2/counter/core/DimensionPropsTest.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/s2/counter/core/DimensionPropsTest.scala 
b/s2counter_loader/src/test/scala/s2/counter/core/DimensionPropsTest.scala
deleted file mode 100644
index b658d05..0000000
--- a/s2counter_loader/src/test/scala/s2/counter/core/DimensionPropsTest.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package s2.counter.core
-
-import org.scalatest.{FunSuite, Matchers}
-
-import scala.collection.mutable.ListBuffer
-
-/**
- * Created by hsleep([email protected]) on 2015. 10. 7..
- */
-class DimensionPropsTest extends FunSuite with Matchers {
-  test("makeRequestBody with Seq") {
-    val requestBody =
-      """
-        |{
-        |  "_from" => [[_from]]
-        |}
-      """.stripMargin
-    val requestBodyExpected =
-      """
-        |{
-        |  "_from" => 1
-        |}
-      """.stripMargin
-    val requestBodyResult = DimensionProps.makeRequestBody(requestBody, 
Seq(("[[_from]]", "1")).toList)
-
-    requestBodyResult shouldEqual requestBodyExpected
-  }
-
-  test("makeRequestBody with ListBuffer") {
-    val requestBody =
-      """
-        |{
-        |  "_from" => [[_from]]
-        |}
-      """.stripMargin
-    val requestBodyExpected =
-      """
-        |{
-        |  "_from" => 1
-        |}
-      """.stripMargin
-    val requestBodyResult = DimensionProps.makeRequestBody(requestBody, 
ListBuffer(("[[_from]]", "1")).toList)
-
-    requestBodyResult shouldEqual requestBodyExpected
-  }
-}


Reply via email to