Repository: incubator-s2graph
Updated Branches:
refs/heads/master 09a7919de -> c2f3fc21b
[S2GRAPH-84]: Test-case compilation error on `s2counter_loader` project.
JIRA:
[S2GRAPH-84] https://issues.apache.org/jira/browse/S2GRAPH-84
Pull Request:
Closes #56
Authors:
Jaesang Kim: <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/c2f3fc21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/c2f3fc21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/c2f3fc21
Branch: refs/heads/master
Commit: c2f3fc21b40bf21611e242c63146e847de28f5e5
Parents: 09a7919
Author: DO YUNG YOON <[email protected]>
Authored: Thu Sep 1 06:53:42 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Thu Sep 1 06:53:42 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../loader/core/CounterEtlFunctionsSpec.scala | 27 +-
.../loader/core/DimensionPropsTest.scala | 3 -
.../stream/ExactCounterStreamingSpec.scala | 217 ---------
.../stream/RankingCounterStreamingSpec.scala | 470 -------------------
5 files changed, 23 insertions(+), 696 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c2f3fc21/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ebb11c6..7fd4600 100644
--- a/CHANGES
+++ b/CHANGES
@@ -174,6 +174,8 @@ Release 0.12.1 - unreleased
S2GRAPH-103: Remove dependencies on custom fork of asynchbase (Committed
by DOYUNG YOON).
S2GRAPH-102: Add more configuration optoins in application.conf (Committed
by DOYUNG YOON).
+
+ S2GRAPH-84: Test-case compilation error on `s2counter_loader` project
(Committed by Jaesang Kim).
TEST
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c2f3fc21/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
----------------------------------------------------------------------
diff --git
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
index 5b41434..8ae1de3 100644
---
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
+++
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
@@ -20,15 +20,30 @@
package org.apache.s2graph.counter.loader.core
import com.typesafe.config.ConfigFactory
-import org.scalatest.{Matchers, BeforeAndAfterAll, FlatSpec}
-import s2.models.DBModel
+import org.apache.s2graph.core.mysqls.{Label, Service}
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.{Graph, Management}
+import org.apache.s2graph.counter.models.DBModel
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.ExecutionContext.Implicits.global
-/**
- * Created by hsleep([email protected]) on 15. 7. 3..
- */
class CounterEtlFunctionsSpec extends FlatSpec with BeforeAndAfterAll with
Matchers {
+ val config = ConfigFactory.load()
+ val cluster = config.getString("hbase.zookeeper.quorum")
+ DBModel.initialize(config)
+
+ val graph = new Graph(config)(global)
+ val management = new Management(graph)
+
override def beforeAll: Unit = {
- DBModel.initialize(ConfigFactory.load())
+ management.createService("test", cluster, "test", 1, None, "gz")
+ management.createLabel("test_case", "test", "src", "string", "test",
"tgt", "string", true, "test", Nil, Nil, "weak", None, None,
HBaseType.DEFAULT_VERSION, false, "gz")
+ }
+
+ override def afterAll: Unit = {
+ Label.delete(Label.findByName("test_case", false).get.id.get)
+ Service.delete(Service.findByName("test", false).get.id.get)
}
"CounterEtlFunctions" should "parsing log" in {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c2f3fc21/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
----------------------------------------------------------------------
diff --git
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
index 1bac7e1..e2b5ef9 100644
---
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
+++
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala
@@ -23,9 +23,6 @@ import org.scalatest.{FunSuite, Matchers}
import scala.collection.mutable.ListBuffer
-/**
- * Created by hsleep([email protected]) on 2015. 10. 7..
- */
class DimensionPropsTest extends FunSuite with Matchers {
test("makeRequestBody with Seq") {
val requestBody =
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c2f3fc21/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
----------------------------------------------------------------------
diff --git
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
deleted file mode 100644
index 76ad80d..0000000
---
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.counter.loader.stream
-
-import org.apache.s2graph.core.{Management, GraphUtil}
-import org.apache.s2graph.core.mysqls.Label
-import org.apache.s2graph.counter.loader.counter.core.{CounterEtlItem,
CounterFunctions}
-import org.apache.s2graph.counter.loader.models.DefaultCounterModel
-import org.apache.s2graph.spark.config.S2ConfigFactory
-import org.apache.s2graph.spark.spark.HashMapParam
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import play.api.libs.json.Json
-import CounterFunctions.HashMapAccumulable
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core._
-import s2.counter.core.v2.{ExactStorageGraph, GraphOperation,
RankingStorageGraph}
-import s2.helper.CounterAdmin
-import s2.models.{Counter, DBModel}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.{Failure, Success}
-
-/**
- * Created by hsleep([email protected]) on 2015. 11. 19..
- */
-class ExactCounterStreamingSpec extends FlatSpec with Matchers with
BeforeAndAfterAll {
- private val master = "local[2]"
- private val appName = "exact_counter_streaming"
- private val batchDuration = Seconds(1)
-
- private var sc: SparkContext = _
- private var ssc: StreamingContext = _
-
- val admin = new CounterAdmin(S2ConfigFactory.config)
- val graphOp = new GraphOperation(S2ConfigFactory.config)
- val s2config = new S2CounterConfig(S2ConfigFactory.config)
-
- val exactCounter = new ExactCounter(S2ConfigFactory.config, new
ExactStorageGraph(S2ConfigFactory.config))
- val rankingCounter = new RankingCounter(S2ConfigFactory.config, new
RankingStorageGraph(S2ConfigFactory.config))
-
- val service = "test"
- val action = "test_case"
-
- override def beforeAll(): Unit = {
- DBModel.initialize(S2ConfigFactory.config)
-
- val conf = new SparkConf()
- .setMaster(master)
- .setAppName(appName)
-
- ssc = new StreamingContext(conf, batchDuration)
-
- sc = ssc.sparkContext
-
- // create test_case label
- Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM,
s"${service}_dev", 1, None, "gz")
- if (Label.findByName(action, useCache = false).isEmpty) {
- val strJs =
- s"""
- |{
- | "label": "$action",
- | "srcServiceName": "$service",
- | "srcColumnName": "src",
- | "srcColumnType": "string",
- | "tgtServiceName": "$service",
- | "tgtColumnName": "$action",
- | "tgtColumnType": "string",
- | "indices": [
- | ],
- | "props": [
- | ]
- |}
- """.stripMargin
- graphOp.createLabel(Json.parse(strJs))
- }
-
- // action
- admin.deleteCounter(service, action).foreach {
- case Success(v) =>
- case Failure(ex) =>
- println(s"$ex")
- }
- admin.createCounter(Counter(useFlag = true, 2, service, action,
Counter.ItemType.STRING, autoComb = true, "is_shared,relationship", useRank =
true))
- }
-
- override def afterAll(): Unit = {
- admin.deleteCounter(service, action)
- if (ssc != null) {
- ssc.stop()
- }
- }
-
- "ExactCounter" should "update" in {
- val policy = DefaultCounterModel.findByServiceAction(service, action).get
- val data =
- s"""
- |1434534565675 $service $action 70362200_94013572857366866
{"is_shared":"false","relationship":"FE"}
{"userId":"48255079","userIdType":"profile_id","value":"1"}
- |1434534565675 $service $action 46889329_94013502934177075
{"is_shared":"false","relationship":"FE"}
{"userId":"48255079","userIdType":"profile_id","value":"1"}
- |1434534566220 $service $action 51223360_94013140590929619
{"is_shared":"false","relationship":"FE"}
{"userId":"312383","userIdType":"profile_id","value":"1"}
- |1434534566508 $service $action 63808459_94013420047377826
{"is_shared":"false","relationship":"FE"}
{"userId":"21968241","userIdType":"profile_id","value":"1"}
- |1434534566210 $service $action 46889329_94013502934177075
{"is_shared":"false","relationship":"FE"}
{"userId":"6062217","userIdType":"profile_id","value":"1"}
- |1434534566459 $service $action 49699692_94012186431261763
{"is_shared":"false","relationship":"FE"}
{"userId":"67863471","userIdType":"profile_id","value":"1"}
- |1434534565681 $service $action 64556827_94012311028641810
{"is_shared":"false","relationship":"FE"}
{"userId":"19381218","userIdType":"profile_id","value":"1"}
- |1434534565865 $service $action 41814266_94012477588942163
{"is_shared":"false","relationship":"FE"}
{"userId":"19268547","userIdType":"profile_id","value":"1"}
- |1434534565865 $service $action 66697741_94007840665633458
{"is_shared":"false","relationship":"FE"}
{"userId":"19268547","userIdType":"profile_id","value":"1"}
- |1434534566142 $service $action 66444074_94012737377133826
{"is_shared":"false","relationship":"FE"}
{"userId":"11917195","userIdType":"profile_id","value":"1"}
- |1434534566077 $service $action 46889329_94013502934177075
{"is_shared":"false","relationship":"FE"}
{"userId":"37709890","userIdType":"profile_id","value":"1"}
- |1434534565938 $service $action 40921487_94012905738975266
{"is_shared":"false","relationship":"FE"}
{"userId":"59869223","userIdType":"profile_id","value":"1"}
- |1434534566033 $service $action 64506628_93994707216829506
{"is_shared":"false","relationship":"FE"}
{"userId":"50375575","userIdType":"profile_id","value":"1"}
- |1434534566451 $service $action 40748868_94013448321919139
{"is_shared":"false","relationship":"FE"}
{"userId":"12249539","userIdType":"profile_id","value":"1"}
- |1434534566669 $service $action 64499956_94013227717457106
{"is_shared":"false","relationship":"FE"}
{"userId":"25167419","userIdType":"profile_id","value":"1"}
- |1434534566669 $service $action 66444074_94012737377133826
{"is_shared":"false","relationship":"FE"}
{"userId":"25167419","userIdType":"profile_id","value":"1"}
- |1434534566318 $service $action 64774665_94012837889027027
{"is_shared":"true","relationship":"F"}
{"userId":"71557816","userIdType":"profile_id","value":"1"}
- |1434534566274 $service $action 67075480_94008509166933763
{"is_shared":"false","relationship":"FE"}
{"userId":"57931860","userIdType":"profile_id","value":"1"}
- |1434534566659 $service $action 46889329_94013502934177075
{"is_shared":"false","relationship":"FE"}
{"userId":"19990823","userIdType":"profile_id","value":"1"}
- |1434534566250 $service $action 70670053_93719933175630611
{"is_shared":"true","relationship":"F"}
{"userId":"68897412","userIdType":"profile_id","value":"1"}
- |1434534566402 $service $action 46889329_94013502934177075
{"is_shared":"false","relationship":"FE"}
{"userId":"15541439","userIdType":"profile_id","value":"1"}
- |1434534566122 $service $action 48890741_94013463616012786
{"is_shared":"false","relationship":"FE"}
{"userId":"48040409","userIdType":"profile_id","value":"1"}
- |1434534566055 $service $action 64509008_94002318232678546
{"is_shared":"true","relationship":"F"}
{"userId":"46532039","userIdType":"profile_id","value":"1"}
- |1434534565994 $service $action 66644368_94009163363033795
{"is_shared":"false","relationship":"FE"}
{"userId":"4143147","userIdType":"profile_id","value":"1"}
- |1434534566448 $service $action 64587644_93938555963733954
{"is_shared":"false","relationship":"FE"}
{"userId":"689042","userIdType":"profile_id","value":"1"}
- |1434534565935 $service $action 52812511_94012009551561315
{"is_shared":"false","relationship":"FE"}
{"userId":"35509692","userIdType":"profile_id","value":"1"}
- |1434534566544 $service $action 70452048_94008573197583762
{"is_shared":"false","relationship":"FE"}
{"userId":"5172421","userIdType":"profile_id","value":"1"}
- |1434534565929 $service $action 54547023_94013384964278435
{"is_shared":"false","relationship":"FE"}
{"userId":"33556498","userIdType":"profile_id","value":"1"}
- |1434534566358 $service $action 46889329_94013502934177075
{"is_shared":"false","relationship":"FE"}
{"userId":"8987346","userIdType":"profile_id","value":"1"}
- |1434534566057 $service $action 67075480_94008509166933763
{"is_shared":"false","relationship":"FE"}
{"userId":"35134964","userIdType":"profile_id","value":"1"}
- |1434534566140 $service $action 54547023_94013384964278435
{"is_shared":"false","relationship":"FE"}
{"userId":"11900315","userIdType":"profile_id","value":"1"}
- |1434534566158 $service $action 64639374_93888330176053635
{"is_shared":"true","relationship":"F"}
{"userId":"49996643","userIdType":"profile_id","value":"1"}
- |1434534566025 $service $action 67265128_94009084771192002
{"is_shared":"false","relationship":"FE"}
{"userId":"37801480","userIdType":"profile_id","value":"1"}
- """.stripMargin.trim
- // println(data)
- val rdd = sc.parallelize(Seq(("", data)))
-
- // rdd.foreachPartition { part =>
- // part.foreach(println)
- // }
- val resultRdd = CounterFunctions.makeExactRdd(rdd, 2)
- val result = resultRdd.collect().toMap
-
- // result.foreachPartition { part =>
- // part.foreach(println)
- // }
-
- val parsed = {
- for {
- line <- GraphUtil.parseString(data)
- item <- CounterEtlItem(line).toSeq
- ev <- CounterFunctions.exactMapper(item).toSeq
- } yield {
- ev
- }
- }
- val parsedResult = parsed.groupBy(_._1).mapValues(values =>
values.map(_._2).reduce(CounterFunctions.reduceValue[ExactQualifier, Long](_ +
_, 0L)))
-
- // parsedResult.foreach { case (k, v) =>
- // println(k, v)
- // }
-
- result should not be empty
- result should equal (parsedResult)
-
- val itemId = "46889329_94013502934177075"
- val key = ExactKey(DefaultCounterModel.findByServiceAction(service,
action).get, itemId, checkItemType = true)
- val value = result.get(key)
-
- value should not be empty
- value.get.get(ExactQualifier(TimedQualifier("t", 0), Map.empty[String,
String])) should equal (Some(6L))
-
- exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0,
Map.empty[String, Set[String]]) should be (None)
-
- val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String,
Long], "Throughput")(HashMapParam[String, Long](_ + _))
- resultRdd.foreachPartition { part =>
- CounterFunctions.updateExactCounter(part.toSeq, acc)
- }
-
- Option(FetchedCountsGrouped(key, Map(
- (IntervalUnit.TOTAL, Map.empty[String, String]) ->
Map(ExactQualifier(TimedQualifier("t", 0), "") -> 6l)
- ))).foreach { expected =>
- exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0,
Map.empty[String, Set[String]]) should be (Some(expected))
- }
- Option(FetchedCountsGrouped(key, Map(
- (IntervalUnit.TOTAL, Map("is_shared" -> "false")) ->
Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.false") -> 6l)
- ))).foreach { expected =>
- exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0,
Map("is_shared" -> Set("false"))) should be (Some(expected))
- }
- Option(FetchedCountsGrouped(key, Map(
- (IntervalUnit.TOTAL, Map("relationship" -> "FE")) ->
Map(ExactQualifier(TimedQualifier("t", 0), "relationship.FE") -> 6l)
- ))).foreach { expected =>
- exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0,
Map("relationship" -> Set("FE"))) should be (Some(expected))
- }
- Option(FetchedCountsGrouped(key, Map(
- (IntervalUnit.TOTAL, Map("is_shared" -> "false", "relationship" ->
"FE")) -> Map(ExactQualifier(TimedQualifier("t", 0),
"is_shared.relationship.false.FE") -> 6l)
- ))).foreach { expected =>
- exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0,
Map("is_shared" -> Set("false"), "relationship" -> Set("FE"))) should be
(Some(expected))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/c2f3fc21/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
----------------------------------------------------------------------
diff --git
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
deleted file mode 100644
index 87ac4c5..0000000
---
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.counter.loader.stream
-
-import org.apache.s2graph.core.Management
-import org.apache.s2graph.core.mysqls.Label
-import org.apache.s2graph.counter.loader.counter.core.CounterFunctions
-import org.apache.s2graph.counter.loader.models.DefaultCounterModel
-import org.apache.s2graph.spark.config.S2ConfigFactory
-import org.apache.s2graph.spark.spark.HashMapParam
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import play.api.libs.json.Json
-import CounterFunctions.HashMapAccumulable
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core._
-import s2.counter.core.v2.{ExactStorageGraph, GraphOperation,
RankingStorageGraph}
-import s2.helper.CounterAdmin
-import s2.models.{Counter, DBModel}
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.{Failure, Success}
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 17..
- */
-class RankingCounterStreamingSpec extends FlatSpec with BeforeAndAfterAll with
Matchers {
- private val master = "local[2]"
- private val appName = "ranking_counter_streaming"
- private val batchDuration = Seconds(1)
-
- private var sc: SparkContext = _
- private var ssc: StreamingContext = _
-
- val admin = new CounterAdmin(S2ConfigFactory.config)
- val graphOp = new GraphOperation(S2ConfigFactory.config)
- val s2config = new S2CounterConfig(S2ConfigFactory.config)
-
- val exactCounter = new ExactCounter(S2ConfigFactory.config, new
ExactStorageGraph(S2ConfigFactory.config))
- val rankingCounter = new RankingCounter(S2ConfigFactory.config, new
RankingStorageGraph(S2ConfigFactory.config))
-
- val service = "test"
- val action = "test_case"
- val action_base = "test_case_base"
- val action_rate = "test_case_rate"
- val action_rate_threshold = "test_case_rate_threshold"
- val action_trend = "test_case_trend"
-
- override def beforeAll(): Unit = {
- DBModel.initialize(S2ConfigFactory.config)
-
- val conf = new SparkConf()
- .setMaster(master)
- .setAppName(appName)
-
- ssc = new StreamingContext(conf, batchDuration)
-
- sc = ssc.sparkContext
-
- admin.setupCounterOnGraph()
-
- // create test_case label
- Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM,
s"${service}_dev", 1, None, "gz")
- if (Label.findByName(action, useCache = false).isEmpty) {
- val strJs =
- s"""
- |{
- | "label": "$action",
- | "srcServiceName": "$service",
- | "srcColumnName": "src",
- | "srcColumnType": "string",
- | "tgtServiceName": "$service",
- | "tgtColumnName": "$action",
- | "tgtColumnType": "string",
- | "indices": [
- | ],
- | "props": [
- | ]
- |}
- """.stripMargin
- graphOp.createLabel(Json.parse(strJs))
- }
- if (Label.findByName(action_base, useCache = false).isEmpty) {
- val strJs =
- s"""
- |{
- | "label": "$action_base",
- | "srcServiceName": "$service",
- | "srcColumnName": "src",
- | "srcColumnType": "string",
- | "tgtServiceName": "$service",
- | "tgtColumnName": "$action",
- | "tgtColumnType": "string",
- | "indices": [
- | ],
- | "props": [
- | ]
- |}
- """.stripMargin
- graphOp.createLabel(Json.parse(strJs))
- }
-
- // action
- admin.deleteCounter(service, action).foreach {
- case Success(v) =>
- case Failure(ex) =>
- println(s"$ex")
- }
- admin.createCounter(Counter(useFlag = true, 2, service, action,
Counter.ItemType.STRING, autoComb = true, "", useRank = true))
- val policy = DefaultCounterModel.findByServiceAction(service, action).get
-
- // action_base
- admin.deleteCounter(service, action_base).foreach {
- case Success(v) =>
- case Failure(ex) =>
- println(s"$ex")
- }
- admin.createCounter(Counter(useFlag = true, 2, service, action_base,
Counter.ItemType.STRING, autoComb = true, "", useRank = true))
- val basePolicy = DefaultCounterModel.findByServiceAction(service,
action_base).get
-
- // action_rate
- admin.deleteCounter(service, action_rate).foreach {
- case Success(v) =>
- case Failure(ex) =>
- println(s"$ex")
- }
- admin.createCounter(Counter(useFlag = true, 2, service, action_rate,
Counter.ItemType.STRING, autoComb = true, "gender,p1", useRank = true,
- rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id)))
-
- // action_rate_threshold
- admin.deleteCounter(service, action_rate_threshold).foreach {
- case Success(v) =>
- case Failure(ex) =>
- println(s"$ex")
- }
- admin.createCounter(Counter(useFlag = true, 2, service,
action_rate_threshold, Counter.ItemType.STRING, autoComb = true, "gender,p1",
useRank = true,
- rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id),
rateThreshold = Some(3)))
-
- // action_trend
- admin.deleteCounter(service, action_trend).foreach {
- case Success(v) =>
- case Failure(ex) =>
- println(s"$ex")
- }
- admin.createCounter(Counter(useFlag = true, 2, service, action_trend,
Counter.ItemType.STRING, autoComb = true, "p1", useRank = true,
- rateActionId = Some(policy.id), rateBaseId = Some(policy.id)))
- }
-
- override def afterAll(): Unit = {
- admin.deleteCounter(service, action)
- admin.deleteCounter(service, action_base)
- admin.deleteCounter(service, action_rate)
- admin.deleteCounter(service, action_rate_threshold)
- admin.deleteCounter(service, action_trend)
- if (ssc != null) {
- ssc.stop()
- }
- }
-
- "RankingCounterStreaming" should "update" in {
- val policy = DefaultCounterModel.findByServiceAction(service, action).get
-// val basePolicy = DefaultCounterModel.findByServiceAction(service,
action_base).get
-
- rankingCounter.ready(policy) should equal (true)
- val data =
- s"""
-
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":3}]}
-
|{"success":false,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-
|{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-
|{"success":true,"policyId":${policy.id},"item":"4","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
- """.stripMargin.trim
- // println(data)
- val rdd = sc.parallelize(Seq(("", data)))
-
- // rdd.foreachPartition { part =>
- // part.foreach(println)
- // }
-
- val result = CounterFunctions.makeRankingRdd(rdd, 2).collect().toMap
-
- // result.foreachPartition { part =>
- // part.foreach(println)
- // }
-
- val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String,
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
- result should not be empty
- val rankKey = RankingKey(policy.id, policy.version,
ExactQualifier(TimedQualifier("M", 1433084400000L), ""))
- result should contain (rankKey -> Map(
- "1" -> RankingValue(3, 1),
- "3" -> RankingValue(2, 2),
- "4" -> RankingValue(1, 1)
- ))
-
- val key = RankingKey(policy.id, policy.version,
ExactQualifier(TimedQualifier("M", 1433084400000L), ""))
- val value = result.get(key)
-
- value should not be empty
- value.get.get("1").get should equal (RankingValue(3, 1))
- value.get.get("2") shouldBe empty
- value.get.get("3").get should equal (RankingValue(2, 2))
-
- rankingCounter.ready(policy) should equal (true)
-
- // delete, update and get
- rankingCounter.delete(key)
- Thread.sleep(1000)
- CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
- Thread.sleep(1000)
- val rst = rankingCounter.getTopK(key)
-
- rst should not be empty
-// rst.get.totalScore should equal(4f)
- rst.get.values should contain allOf(("3", 2d), ("4", 1d), ("1", 3d))
- }
-
-// "rate by base" >> {
-// val data =
-// """
-//
|{"success":true,"policyId":42,"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
-// """.stripMargin.trim
-// val rdd = sc.parallelize(Seq(("", data)))
-//
-// val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2).collect()
-// trxLogRdd.foreach { log =>
-// CounterFunctions.rateBaseRankingMapper(log) must not be empty
-// }
-//
-// true must_== true
-// }
-
- it should "update rate ranking counter" in {
- val policy = DefaultCounterModel.findByServiceAction(service, action).get
- val basePolicy = DefaultCounterModel.findByServiceAction(service,
action_base).get
- val ratePolicy = DefaultCounterModel.findByServiceAction(service,
action_rate).get
-
- // update base policy
- val eq = ExactQualifier(TimedQualifier("M", 1433084400000l), "")
- val exactKey = ExactKey(basePolicy, "1", checkItemType = true)
-
- // check base item count
- exactCounter.updateCount(basePolicy, Seq(
- (exactKey, Map(eq -> 2l))
- ))
- Thread.sleep(1000)
-
- // direct get
- val baseCount = exactCounter.getCount(basePolicy, "1",
Seq(IntervalUnit.MONTHLY), 1433084400000l, 1433084400000l, Map.empty[String,
Set[String]])
- baseCount should not be empty
- baseCount.get should equal (FetchedCountsGrouped(exactKey, Map(
- (eq.tq.q, Map.empty[String, String]) -> Map(eq-> 2l)
- )))
-
- // related get
- val relatedCount = exactCounter.getRelatedCounts(basePolicy, Seq("1" ->
Seq(eq)))
- relatedCount should not be empty
- relatedCount.get("1") should not be empty
- relatedCount.get("1").get should equal (Map(eq -> 2l))
-
- val data =
- s"""
-
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
-
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]}
-
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":2,"result":4}]}
-
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
- """.stripMargin.trim
- // println(data)
- val rdd = sc.parallelize(Seq(("", data)))
-
- // rdd.foreachPartition { part =>
- // part.foreach(println)
- // }
-
- val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
- trxLogRdd.count() should equal (data.trim.split('\n').length)
-
- val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
- itemRankingRdd.foreach(println)
-
- val result = CounterFunctions.rateRankingCount(itemRankingRdd,
2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id)
- result.foreach(println)
- result should have size 3
-
- val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String,
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
- // rate ranking
- val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M",
1433084400000L), ""))
- val value = result.get(key)
-
-// println(key, value)
-
- value should not be empty
- value.get.get("1") should not be empty
- value.get.get("1").get should equal (RankingValue(1, 0))
- value.get.get("2").get should equal (RankingValue(0.25, 0))
-
- val key2 = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M",
1433084400000L), "p1.1"))
- val value2 = result.get(key2)
-
-// println(key2, value2)
-
- val values = value.map(v => (key, v)).toSeq ++ value2.map(v => (key2,
v)).toSeq
- println(s"values: $values")
-
- // delete, update and get
- rankingCounter.delete(key)
- rankingCounter.delete(key2)
- Thread.sleep(1000)
- CounterFunctions.updateRankingCounter(values, acc)
- // for update graph
- Thread.sleep(1000)
-
- val rst = rankingCounter.getTopK(key)
- rst should not be empty
- rst.get.values should equal (Seq(("1", 1d), ("2", 0.25d)))
-
- val rst2 = rankingCounter.getTopK(key2)
- rst2 should not be empty
- rst2.get.values should equal (Seq(("2", 0.25d)))
- }
-
- it should "update rate ranking counter with threshold" in {
- val policy = DefaultCounterModel.findByServiceAction(service, action).get
- val basePolicy = DefaultCounterModel.findByServiceAction(service,
action_base).get
- val ratePolicy = DefaultCounterModel.findByServiceAction(service,
action_rate_threshold).get
-
- val data =
- s"""
-
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
-
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]}
- """.stripMargin.trim
- // println(data)
- val rdd = sc.parallelize(Seq(("", data)))
-
- // rdd.foreachPartition { part =>
- // part.foreach(println)
- // }
-
- val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
- trxLogRdd.count() should equal (data.trim.split('\n').length)
-
- val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
- itemRankingRdd.foreach(println)
-
- val result = CounterFunctions.rateRankingCount(itemRankingRdd,
2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id)
- result.foreach(println)
- result should have size 2
-
- val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String,
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
- // rate ranking
- val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M",
1433084400000L), ""))
- val value = result.get(key)
-
- value should not be empty
- value.get.get("1") should be (None)
- value.get.get("2").get should equal (RankingValue(0.25, 0))
-
- // delete, update and get
- rankingCounter.delete(key)
- Thread.sleep(1000)
- CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
- Thread.sleep(1000)
- val rst = rankingCounter.getTopK(key)
-
- rst should not be empty
- rst.get.values should equal (Seq(("2", 0.25d)))
- }
-
- it should "update trend ranking counter" in {
- val policy = DefaultCounterModel.findByServiceAction(service, action).get
- val trendPolicy = DefaultCounterModel.findByServiceAction(service,
action_trend).get
-
- val exactKey1 = ExactKey(policy, "1", checkItemType = true)
- val exactKey2 = ExactKey(policy, "2", checkItemType = true)
- // update old key value
- val tq1 = TimedQualifier("M", 1435676400000l)
- val tq2 = TimedQualifier("M", 1427814000000l)
- exactCounter.updateCount(policy, Seq(
- exactKey1 -> Map(ExactQualifier(tq1.add(-1), "") -> 1l,
ExactQualifier(tq2.add(-1), "") -> 92l)
- ))
- val eq1 = ExactQualifier(tq1, "")
- val eq2 = ExactQualifier(tq2, "")
-
- val oldCount = exactCounter.getPastCounts(policy, Seq("1" -> Seq(eq1,
eq2), "2" -> Seq(eq1, eq1.copy(dimension = "gender.M"))))
- oldCount should not be empty
- oldCount.get("1").get should equal(Map(eq1 -> 1l, eq2 -> 92l))
- oldCount.get("2") should be (None)
-
- val data =
- s"""
-
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":2}]}
-
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1435676400000,"value":1,"result":1}]}
-
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1427814000000,"value":1,"result":92}]}
- """.stripMargin.trim
- // println(data)
- val rdd = sc.parallelize(Seq(("", data)))
-
- // rdd.foreachPartition { part =>
- // part.foreach(println)
- // }
-
- val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
- trxLogRdd.count() should equal (data.trim.split('\n').length)
-
- val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
- itemRankingRdd.foreach(println)
-
- val result = CounterFunctions.trendRankingCount(itemRankingRdd,
2).collect().toMap
- result.foreach(println)
- // dimension gender.M is ignored, because gender is not defined dimension
in trend policy.
- result should have size 2
-
- val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String,
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
- // trend ranking
- val key = RankingKey(trendPolicy.id, 2, ExactQualifier(TimedQualifier("M",
1435676400000L), ""))
- val value = result.get(key)
-
- value should not be empty
- value.get.get("1").get should equal (RankingValue(2, 0))
- value.get.get("2").get should equal (RankingValue(1, 0))
-
- val key2 = RankingKey(trendPolicy.id, 2,
ExactQualifier(TimedQualifier("M", 1427814000000L), ""))
- val value2 = result.get(key2)
-
- value2 should not be empty
- value2.get.get("1").get should equal (RankingValue(1, 0))
-
- // delete, update and get
- rankingCounter.delete(key)
- Thread.sleep(1000)
- CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
- Thread.sleep(1000)
- val rst = rankingCounter.getTopK(key)
-
- rst should not be empty
- rst.get.values should equal (Seq("1" -> 2, "2" -> 1))
- }
-}