Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 09a7919de -> c2f3fc21b


[S2GRAPH-84]: Test-case compilation error on `s2counter_loader` project.

JIRA:
    [S2GRAPH-84] https://issues.apache.org/jira/browse/S2GRAPH-84

Pull Request:
    Closes #56

Authors:
    Jaesang Kim: <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c2f3fc21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c2f3fc21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c2f3fc21

Branch: refs/heads/master
Commit: c2f3fc21b40bf21611e242c63146e847de28f5e5
Parents: 09a7919
Author: DO YUNG YOON <[email protected]>
Authored: Thu Sep 1 06:53:42 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Thu Sep 1 06:53:42 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../loader/core/CounterEtlFunctionsSpec.scala   |  27 +-
 .../loader/core/DimensionPropsTest.scala        |   3 -
 .../stream/ExactCounterStreamingSpec.scala      | 217 ---------
 .../stream/RankingCounterStreamingSpec.scala    | 470 -------------------
 5 files changed, 23 insertions(+), 696 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c2f3fc21/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ebb11c6..7fd4600 100644
--- a/CHANGES
+++ b/CHANGES
@@ -174,6 +174,8 @@ Release 0.12.1 - unreleased
     S2GRAPH-103: Remove dependencies on custom fork of asynchbase (Committed 
by DOYUNG YOON).
 
     S2GRAPH-102: Add more configuration optoins in application.conf (Committed 
by DOYUNG YOON).
+    
+    S2GRAPH-84: Test-case compilation error on `s2counter_loader` project 
(Committed by Jaesang Kim).
 
   TEST
     

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c2f3fc21/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
index 5b41434..8ae1de3 100644
--- 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
+++ 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
@@ -20,15 +20,30 @@
 package org.apache.s2graph.counter.loader.core
 
 import com.typesafe.config.ConfigFactory
-import org.scalatest.{Matchers, BeforeAndAfterAll, FlatSpec}
-import s2.models.DBModel
+import org.apache.s2graph.core.mysqls.{Label, Service}
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.{Graph, Management}
+import org.apache.s2graph.counter.models.DBModel
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.ExecutionContext.Implicits.global
 
-/**
- * Created by hsleep([email protected]) on 15. 7. 3..
- */
 class CounterEtlFunctionsSpec extends FlatSpec with BeforeAndAfterAll with 
Matchers {
+  val config = ConfigFactory.load()
+  val cluster = config.getString("hbase.zookeeper.quorum")
+  DBModel.initialize(config)
+
+  val graph = new Graph(config)(global)
+  val management = new Management(graph)
+
   override def beforeAll: Unit = {
-    DBModel.initialize(ConfigFactory.load())
+    management.createService("test", cluster, "test", 1, None, "gz")
+    management.createLabel("test_case", "test", "src", "string", "test", 
"tgt", "string", true, "test", Nil, Nil, "weak", None, None, 
HBaseType.DEFAULT_VERSION, false, "gz")
+  }
+
+  override def afterAll: Unit = {
+    Label.delete(Label.findByName("test_case", false).get.id.get)
+    Service.delete(Service.findByName("test", false).get.id.get)
   }
 
   "CounterEtlFunctions" should "parsing log" in {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c2f3fc21/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
index 1bac7e1..e2b5ef9 100644
--- 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
+++ 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
@@ -23,9 +23,6 @@ import org.scalatest.{FunSuite, Matchers}
 
 import scala.collection.mutable.ListBuffer
 
-/**
- * Created by hsleep([email protected]) on 2015. 10. 7..
- */
 class DimensionPropsTest extends FunSuite with Matchers {
   test("makeRequestBody with Seq") {
     val requestBody =

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c2f3fc21/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
deleted file mode 100644
index 76ad80d..0000000
--- 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.counter.loader.stream
-
-import org.apache.s2graph.core.{Management, GraphUtil}
-import org.apache.s2graph.core.mysqls.Label
-import org.apache.s2graph.counter.loader.counter.core.{CounterEtlItem, 
CounterFunctions}
-import org.apache.s2graph.counter.loader.models.DefaultCounterModel
-import org.apache.s2graph.spark.config.S2ConfigFactory
-import org.apache.s2graph.spark.spark.HashMapParam
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import play.api.libs.json.Json
-import CounterFunctions.HashMapAccumulable
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core._
-import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, 
RankingStorageGraph}
-import s2.helper.CounterAdmin
-import s2.models.{Counter, DBModel}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.{Failure, Success}
-
-/**
-  * Created by hsleep([email protected]) on 2015. 11. 19..
-  */
-class ExactCounterStreamingSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
-  private val master = "local[2]"
-  private val appName = "exact_counter_streaming"
-  private val batchDuration = Seconds(1)
-
-  private var sc: SparkContext = _
-  private var ssc: StreamingContext = _
-
-  val admin = new CounterAdmin(S2ConfigFactory.config)
-  val graphOp = new GraphOperation(S2ConfigFactory.config)
-  val s2config = new S2CounterConfig(S2ConfigFactory.config)
-
-  val exactCounter = new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
-  val rankingCounter = new RankingCounter(S2ConfigFactory.config, new 
RankingStorageGraph(S2ConfigFactory.config))
-
-  val service = "test"
-  val action = "test_case"
-
-  override def beforeAll(): Unit = {
-    DBModel.initialize(S2ConfigFactory.config)
-
-    val conf = new SparkConf()
-      .setMaster(master)
-      .setAppName(appName)
-
-    ssc = new StreamingContext(conf, batchDuration)
-
-    sc = ssc.sparkContext
-
-    // create test_case label
-    Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, 
s"${service}_dev", 1, None, "gz")
-    if (Label.findByName(action, useCache = false).isEmpty) {
-      val strJs =
-        s"""
-           |{
-           |  "label": "$action",
-           |  "srcServiceName": "$service",
-           |  "srcColumnName": "src",
-           |  "srcColumnType": "string",
-           |  "tgtServiceName": "$service",
-           |  "tgtColumnName": "$action",
-           |  "tgtColumnType": "string",
-           |  "indices": [
-           |  ],
-           |  "props": [
-           |  ]
-           |}
-       """.stripMargin
-      graphOp.createLabel(Json.parse(strJs))
-    }
-
-    // action
-    admin.deleteCounter(service, action).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING, autoComb = true, "is_shared,relationship", useRank = 
true))
-  }
-
-  override def afterAll(): Unit = {
-    admin.deleteCounter(service, action)
-    if (ssc != null) {
-      ssc.stop()
-    }
-  }
-
-  "ExactCounter" should "update" in {
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-    val data =
-      s"""
-        |1434534565675 $service        $action 70362200_94013572857366866      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48255079","userIdType":"profile_id","value":"1"}
-        |1434534565675 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48255079","userIdType":"profile_id","value":"1"}
-        |1434534566220 $service        $action 51223360_94013140590929619      
{"is_shared":"false","relationship":"FE"}       
{"userId":"312383","userIdType":"profile_id","value":"1"}
-        |1434534566508 $service        $action 63808459_94013420047377826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"21968241","userIdType":"profile_id","value":"1"}
-        |1434534566210 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"6062217","userIdType":"profile_id","value":"1"}
-        |1434534566459 $service        $action 49699692_94012186431261763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"67863471","userIdType":"profile_id","value":"1"}
-        |1434534565681 $service        $action 64556827_94012311028641810      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19381218","userIdType":"profile_id","value":"1"}
-        |1434534565865 $service        $action 41814266_94012477588942163      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19268547","userIdType":"profile_id","value":"1"}
-        |1434534565865 $service        $action 66697741_94007840665633458      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19268547","userIdType":"profile_id","value":"1"}
-        |1434534566142 $service        $action 66444074_94012737377133826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"11917195","userIdType":"profile_id","value":"1"}
-        |1434534566077 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"37709890","userIdType":"profile_id","value":"1"}
-        |1434534565938 $service        $action 40921487_94012905738975266      
{"is_shared":"false","relationship":"FE"}       
{"userId":"59869223","userIdType":"profile_id","value":"1"}
-        |1434534566033 $service        $action 64506628_93994707216829506      
{"is_shared":"false","relationship":"FE"}       
{"userId":"50375575","userIdType":"profile_id","value":"1"}
-        |1434534566451 $service        $action 40748868_94013448321919139      
{"is_shared":"false","relationship":"FE"}       
{"userId":"12249539","userIdType":"profile_id","value":"1"}
-        |1434534566669 $service        $action 64499956_94013227717457106      
{"is_shared":"false","relationship":"FE"}       
{"userId":"25167419","userIdType":"profile_id","value":"1"}
-        |1434534566669 $service        $action 66444074_94012737377133826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"25167419","userIdType":"profile_id","value":"1"}
-        |1434534566318 $service        $action 64774665_94012837889027027      
{"is_shared":"true","relationship":"F"} 
{"userId":"71557816","userIdType":"profile_id","value":"1"}
-        |1434534566274 $service        $action 67075480_94008509166933763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"57931860","userIdType":"profile_id","value":"1"}
-        |1434534566659 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19990823","userIdType":"profile_id","value":"1"}
-        |1434534566250 $service        $action 70670053_93719933175630611      
{"is_shared":"true","relationship":"F"} 
{"userId":"68897412","userIdType":"profile_id","value":"1"}
-        |1434534566402 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"15541439","userIdType":"profile_id","value":"1"}
-        |1434534566122 $service        $action 48890741_94013463616012786      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48040409","userIdType":"profile_id","value":"1"}
-        |1434534566055 $service        $action 64509008_94002318232678546      
{"is_shared":"true","relationship":"F"} 
{"userId":"46532039","userIdType":"profile_id","value":"1"}
-        |1434534565994 $service        $action 66644368_94009163363033795      
{"is_shared":"false","relationship":"FE"}       
{"userId":"4143147","userIdType":"profile_id","value":"1"}
-        |1434534566448 $service        $action 64587644_93938555963733954      
{"is_shared":"false","relationship":"FE"}       
{"userId":"689042","userIdType":"profile_id","value":"1"}
-        |1434534565935 $service        $action 52812511_94012009551561315      
{"is_shared":"false","relationship":"FE"}       
{"userId":"35509692","userIdType":"profile_id","value":"1"}
-        |1434534566544 $service        $action 70452048_94008573197583762      
{"is_shared":"false","relationship":"FE"}       
{"userId":"5172421","userIdType":"profile_id","value":"1"}
-        |1434534565929 $service        $action 54547023_94013384964278435      
{"is_shared":"false","relationship":"FE"}       
{"userId":"33556498","userIdType":"profile_id","value":"1"}
-        |1434534566358 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"8987346","userIdType":"profile_id","value":"1"}
-        |1434534566057 $service        $action 67075480_94008509166933763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"35134964","userIdType":"profile_id","value":"1"}
-        |1434534566140 $service        $action 54547023_94013384964278435      
{"is_shared":"false","relationship":"FE"}       
{"userId":"11900315","userIdType":"profile_id","value":"1"}
-        |1434534566158 $service        $action 64639374_93888330176053635      
{"is_shared":"true","relationship":"F"} 
{"userId":"49996643","userIdType":"profile_id","value":"1"}
-        |1434534566025 $service        $action 67265128_94009084771192002      
{"is_shared":"false","relationship":"FE"}       
{"userId":"37801480","userIdType":"profile_id","value":"1"}
-      """.stripMargin.trim
-    //    println(data)
-    val rdd = sc.parallelize(Seq(("", data)))
-
-    //    rdd.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-    val resultRdd = CounterFunctions.makeExactRdd(rdd, 2)
-    val result = resultRdd.collect().toMap
-
-    //    result.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val parsed = {
-      for {
-        line <- GraphUtil.parseString(data)
-        item <- CounterEtlItem(line).toSeq
-        ev <- CounterFunctions.exactMapper(item).toSeq
-      } yield {
-        ev
-      }
-    }
-    val parsedResult = parsed.groupBy(_._1).mapValues(values => 
values.map(_._2).reduce(CounterFunctions.reduceValue[ExactQualifier, Long](_ + 
_, 0L)))
-
-    //    parsedResult.foreach { case (k, v) =>
-    //      println(k, v)
-    //    }
-
-    result should not be empty
-    result should equal (parsedResult)
-
-    val itemId = "46889329_94013502934177075"
-    val key = ExactKey(DefaultCounterModel.findByServiceAction(service, 
action).get, itemId, checkItemType = true)
-    val value = result.get(key)
-
-    value should not be empty
-    value.get.get(ExactQualifier(TimedQualifier("t", 0), Map.empty[String, 
String])) should equal (Some(6L))
-
-    exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map.empty[String, Set[String]]) should be (None)
-
-    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-    resultRdd.foreachPartition { part =>
-      CounterFunctions.updateExactCounter(part.toSeq, acc)
-    }
-
-    Option(FetchedCountsGrouped(key, Map(
-      (IntervalUnit.TOTAL, Map.empty[String, String]) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "") -> 6l)
-    ))).foreach { expected =>
-      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map.empty[String, Set[String]]) should be (Some(expected))
-    }
-    Option(FetchedCountsGrouped(key, Map(
-      (IntervalUnit.TOTAL, Map("is_shared" -> "false")) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.false") -> 6l)
-    ))).foreach { expected =>
-      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("is_shared" -> Set("false"))) should be (Some(expected))
-    }
-    Option(FetchedCountsGrouped(key, Map(
-      (IntervalUnit.TOTAL, Map("relationship" -> "FE")) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "relationship.FE") -> 6l)
-    ))).foreach { expected =>
-      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("relationship" -> Set("FE"))) should be (Some(expected))
-    }
-    Option(FetchedCountsGrouped(key, Map(
-      (IntervalUnit.TOTAL, Map("is_shared" -> "false", "relationship" -> 
"FE")) -> Map(ExactQualifier(TimedQualifier("t", 0), 
"is_shared.relationship.false.FE") -> 6l)
-    ))).foreach { expected =>
-      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("is_shared" -> Set("false"), "relationship" -> Set("FE"))) should be 
(Some(expected))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c2f3fc21/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
deleted file mode 100644
index 87ac4c5..0000000
--- 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.counter.loader.stream
-
-import org.apache.s2graph.core.Management
-import org.apache.s2graph.core.mysqls.Label
-import org.apache.s2graph.counter.loader.counter.core.CounterFunctions
-import org.apache.s2graph.counter.loader.models.DefaultCounterModel
-import org.apache.s2graph.spark.config.S2ConfigFactory
-import org.apache.s2graph.spark.spark.HashMapParam
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import play.api.libs.json.Json
-import CounterFunctions.HashMapAccumulable
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core._
-import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, 
RankingStorageGraph}
-import s2.helper.CounterAdmin
-import s2.models.{Counter, DBModel}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.{Failure, Success}
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 17..
- */
-class RankingCounterStreamingSpec extends FlatSpec with BeforeAndAfterAll with 
Matchers {
-  private val master = "local[2]"
-  private val appName = "ranking_counter_streaming"
-  private val batchDuration = Seconds(1)
-
-  private var sc: SparkContext = _
-  private var ssc: StreamingContext = _
-
-  val admin = new CounterAdmin(S2ConfigFactory.config)
-  val graphOp = new GraphOperation(S2ConfigFactory.config)
-  val s2config = new S2CounterConfig(S2ConfigFactory.config)
-
-  val exactCounter = new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
-  val rankingCounter = new RankingCounter(S2ConfigFactory.config, new 
RankingStorageGraph(S2ConfigFactory.config))
-
-  val service = "test"
-  val action = "test_case"
-  val action_base = "test_case_base"
-  val action_rate = "test_case_rate"
-  val action_rate_threshold = "test_case_rate_threshold"
-  val action_trend = "test_case_trend"
-
-  override def beforeAll(): Unit = {
-    DBModel.initialize(S2ConfigFactory.config)
-
-    val conf = new SparkConf()
-      .setMaster(master)
-      .setAppName(appName)
-
-    ssc = new StreamingContext(conf, batchDuration)
-
-    sc = ssc.sparkContext
-
-    admin.setupCounterOnGraph()
-
-    // create test_case label
-    Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, 
s"${service}_dev", 1, None, "gz")
-    if (Label.findByName(action, useCache = false).isEmpty) {
-      val strJs =
-        s"""
-           |{
-           |  "label": "$action",
-           |  "srcServiceName": "$service",
-           |  "srcColumnName": "src",
-           |  "srcColumnType": "string",
-           |  "tgtServiceName": "$service",
-           |  "tgtColumnName": "$action",
-           |  "tgtColumnType": "string",
-           |  "indices": [
-           |  ],
-           |  "props": [
-           |  ]
-           |}
-       """.stripMargin
-      graphOp.createLabel(Json.parse(strJs))
-    }
-    if (Label.findByName(action_base, useCache = false).isEmpty) {
-      val strJs =
-        s"""
-           |{
-           |  "label": "$action_base",
-           |  "srcServiceName": "$service",
-           |  "srcColumnName": "src",
-           |  "srcColumnType": "string",
-           |  "tgtServiceName": "$service",
-           |  "tgtColumnName": "$action",
-           |  "tgtColumnType": "string",
-           |  "indices": [
-           |  ],
-           |  "props": [
-           |  ]
-           |}
-       """.stripMargin
-      graphOp.createLabel(Json.parse(strJs))
-    }
-
-    // action
-    admin.deleteCounter(service, action).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING, autoComb = true, "", useRank = true))
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-
-    // action_base
-    admin.deleteCounter(service, action_base).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, action_base, 
Counter.ItemType.STRING, autoComb = true, "", useRank = true))
-    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
-
-    // action_rate
-    admin.deleteCounter(service, action_rate).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, action_rate, 
Counter.ItemType.STRING, autoComb = true, "gender,p1", useRank = true,
-      rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id)))
-
-    // action_rate_threshold
-    admin.deleteCounter(service, action_rate_threshold).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, 
action_rate_threshold, Counter.ItemType.STRING, autoComb = true, "gender,p1", 
useRank = true,
-      rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id), 
rateThreshold = Some(3)))
-
-    // action_trend
-    admin.deleteCounter(service, action_trend).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, action_trend, 
Counter.ItemType.STRING, autoComb = true, "p1", useRank = true,
-      rateActionId = Some(policy.id), rateBaseId = Some(policy.id)))
- }
-
-  override def afterAll(): Unit = {
-    admin.deleteCounter(service, action)
-    admin.deleteCounter(service, action_base)
-    admin.deleteCounter(service, action_rate)
-    admin.deleteCounter(service, action_rate_threshold)
-    admin.deleteCounter(service, action_trend)
-    if (ssc != null) {
-      ssc.stop()
-    }
-  }
-
-  "RankingCounterStreaming" should "update" in {
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-//    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
-
-    rankingCounter.ready(policy) should equal (true)
-    val data =
-      s"""
-         
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":3}]}
-         
|{"success":false,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-         
|{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-         
|{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-         
|{"success":true,"policyId":${policy.id},"item":"4","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-      """.stripMargin.trim
-    //    println(data)
-    val rdd = sc.parallelize(Seq(("", data)))
-
-    //    rdd.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val result = CounterFunctions.makeRankingRdd(rdd, 2).collect().toMap
-
-    //    result.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
-    result should not be empty
-    val rankKey = RankingKey(policy.id, policy.version, 
ExactQualifier(TimedQualifier("M", 1433084400000L), ""))
-    result should contain (rankKey -> Map(
-      "1" -> RankingValue(3, 1),
-      "3" -> RankingValue(2, 2),
-      "4" -> RankingValue(1, 1)
-    ))
-
-    val key = RankingKey(policy.id, policy.version, 
ExactQualifier(TimedQualifier("M", 1433084400000L), ""))
-    val value = result.get(key)
-
-    value should not be empty
-    value.get.get("1").get should equal (RankingValue(3, 1))
-    value.get.get("2") shouldBe empty
-    value.get.get("3").get should equal (RankingValue(2, 2))
-
-    rankingCounter.ready(policy) should equal (true)
-
-    // delete, update and get
-    rankingCounter.delete(key)
-    Thread.sleep(1000)
-    CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
-    Thread.sleep(1000)
-    val rst = rankingCounter.getTopK(key)
-
-    rst should not be empty
-//    rst.get.totalScore should equal(4f)
-    rst.get.values should contain allOf(("3", 2d), ("4", 1d), ("1", 3d))
-  }
-
-//  "rate by base" >> {
-//    val data =
-//      """
-//        
|{"success":true,"policyId":42,"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
-//      """.stripMargin.trim
-//    val rdd = sc.parallelize(Seq(("", data)))
-//
-//    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2).collect()
-//    trxLogRdd.foreach { log =>
-//      CounterFunctions.rateBaseRankingMapper(log) must not be empty
-//    }
-//
-//    true must_== true
-//  }
-
-  it should "update rate ranking counter" in {
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
-    val ratePolicy = DefaultCounterModel.findByServiceAction(service, 
action_rate).get
-
-    // update base policy
-    val eq = ExactQualifier(TimedQualifier("M", 1433084400000l), "")
-    val exactKey = ExactKey(basePolicy, "1", checkItemType = true)
-
-    // check base item count
-    exactCounter.updateCount(basePolicy, Seq(
-      (exactKey, Map(eq -> 2l))
-    ))
-    Thread.sleep(1000)
-
-    // direct get
-    val baseCount = exactCounter.getCount(basePolicy, "1", 
Seq(IntervalUnit.MONTHLY), 1433084400000l, 1433084400000l, Map.empty[String, 
Set[String]])
-    baseCount should not be empty
-    baseCount.get should equal (FetchedCountsGrouped(exactKey, Map(
-      (eq.tq.q, Map.empty[String, String]) -> Map(eq-> 2l)
-    )))
-
-    // related get
-    val relatedCount = exactCounter.getRelatedCounts(basePolicy, Seq("1" -> 
Seq(eq)))
-    relatedCount should not be empty
-    relatedCount.get("1") should not be empty
-    relatedCount.get("1").get should equal (Map(eq -> 2l))
-
-    val data =
-      s"""
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
-        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":2,"result":4}]}
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-      """.stripMargin.trim
-    //    println(data)
-    val rdd = sc.parallelize(Seq(("", data)))
-
-    //    rdd.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
-    trxLogRdd.count() should equal (data.trim.split('\n').length)
-
-    val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
-    itemRankingRdd.foreach(println)
-
-    val result = CounterFunctions.rateRankingCount(itemRankingRdd, 
2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id)
-    result.foreach(println)
-    result should have size 3
-
-    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
-    // rate ranking
-    val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1433084400000L), ""))
-    val value = result.get(key)
-
-//    println(key, value)
-
-    value should not be empty
-    value.get.get("1") should not be empty
-    value.get.get("1").get should equal (RankingValue(1, 0))
-    value.get.get("2").get should equal (RankingValue(0.25, 0))
-
-    val key2 = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1433084400000L), "p1.1"))
-    val value2 = result.get(key2)
-
-//    println(key2, value2)
-
-    val values = value.map(v => (key, v)).toSeq ++ value2.map(v => (key2, 
v)).toSeq
-    println(s"values: $values")
-
-    // delete, update and get
-    rankingCounter.delete(key)
-    rankingCounter.delete(key2)
-    Thread.sleep(1000)
-    CounterFunctions.updateRankingCounter(values, acc)
-    // for update graph
-    Thread.sleep(1000)
-
-    val rst = rankingCounter.getTopK(key)
-    rst should not be empty
-    rst.get.values should equal (Seq(("1", 1d), ("2", 0.25d)))
-
-    val rst2 = rankingCounter.getTopK(key2)
-    rst2 should not be empty
-    rst2.get.values should equal (Seq(("2", 0.25d)))
-  }
-
-  it should "update rate ranking counter with threshold" in {
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
-    val ratePolicy = DefaultCounterModel.findByServiceAction(service, 
action_rate_threshold).get
-
-    val data =
-      s"""
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
-        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]}
-      """.stripMargin.trim
-    //    println(data)
-    val rdd = sc.parallelize(Seq(("", data)))
-
-    //    rdd.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
-    trxLogRdd.count() should equal (data.trim.split('\n').length)
-
-    val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
-    itemRankingRdd.foreach(println)
-
-    val result = CounterFunctions.rateRankingCount(itemRankingRdd, 
2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id)
-    result.foreach(println)
-    result should have size 2
-
-    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
-    // rate ranking
-    val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1433084400000L), ""))
-    val value = result.get(key)
-
-    value should not be empty
-    value.get.get("1") should be (None)
-    value.get.get("2").get should equal (RankingValue(0.25, 0))
-
-    // delete, update and get
-    rankingCounter.delete(key)
-    Thread.sleep(1000)
-    CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
-    Thread.sleep(1000)
-    val rst = rankingCounter.getTopK(key)
-
-    rst should not be empty
-    rst.get.values should equal (Seq(("2", 0.25d)))
-  }
-
-  it should "update trend ranking counter" in {
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-    val trendPolicy = DefaultCounterModel.findByServiceAction(service, 
action_trend).get
-
-    val exactKey1 = ExactKey(policy, "1", checkItemType = true)
-    val exactKey2 = ExactKey(policy, "2", checkItemType = true)
-    // update old key value
-    val tq1 = TimedQualifier("M", 1435676400000l)
-    val tq2 = TimedQualifier("M", 1427814000000l)
-    exactCounter.updateCount(policy, Seq(
-      exactKey1 -> Map(ExactQualifier(tq1.add(-1), "") -> 1l, 
ExactQualifier(tq2.add(-1), "") -> 92l)
-    ))
-    val eq1 = ExactQualifier(tq1, "")
-    val eq2 = ExactQualifier(tq2, "")
-
-    val oldCount = exactCounter.getPastCounts(policy, Seq("1" -> Seq(eq1, 
eq2), "2" -> Seq(eq1, eq1.copy(dimension = "gender.M"))))
-    oldCount should not be empty
-    oldCount.get("1").get should equal(Map(eq1 -> 1l, eq2 -> 92l))
-    oldCount.get("2") should be (None)
-
-    val data =
-      s"""
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":2}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1435676400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1427814000000,"value":1,"result":92}]}
-      """.stripMargin.trim
-    //    println(data)
-    val rdd = sc.parallelize(Seq(("", data)))
-
-    //    rdd.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
-    trxLogRdd.count() should equal (data.trim.split('\n').length)
-
-    val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
-    itemRankingRdd.foreach(println)
-
-    val result = CounterFunctions.trendRankingCount(itemRankingRdd, 
2).collect().toMap
-    result.foreach(println)
-    // dimension gender.M is ignored, because gender is not defined dimension 
in trend policy.
-    result should have size 2
-
-    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
-    // trend ranking
-    val key = RankingKey(trendPolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1435676400000L), ""))
-    val value = result.get(key)
-
-    value should not be empty
-    value.get.get("1").get should equal (RankingValue(2, 0))
-    value.get.get("2").get should equal (RankingValue(1, 0))
-
-    val key2 = RankingKey(trendPolicy.id, 2, 
ExactQualifier(TimedQualifier("M", 1427814000000L), ""))
-    val value2 = result.get(key2)
-
-    value2 should not be empty
-    value2.get.get("1").get should equal (RankingValue(1, 0))
-
-    // delete, update and get
-    rankingCounter.delete(key)
-    Thread.sleep(1000)
-    CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
-    Thread.sleep(1000)
-    val rst = rankingCounter.getTopK(key)
-
-    rst should not be empty
-    rst.get.values should equal (Seq("1" -> 2, "2" -> 1))
-  }
-}


Reply via email to