Repository: incubator-s2graph
Updated Branches:
  refs/heads/master e9bb727f1 -> 4c2554145


[S2GRAPH-60]: Add divide operation to scorePropagateOp.

  add divide operation on score propagation. (contributed by @wishoping )

JIRA:
  [S2GRAPH-60] https://issues.apache.org/jira/browse/S2GRAPH-60

Pull Request:
  Closes #43

Authors:
  Junki Kim: [email protected]


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4c255414
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4c255414
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4c255414

Branch: refs/heads/master
Commit: 4c2554145695abcd9506461c7498f3331586db6e
Parents: e9bb727
Author: DO YUNG YOON <[email protected]>
Authored: Thu Mar 17 17:35:30 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Thu Mar 17 17:35:30 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../org/apache/s2graph/core/QueryParam.scala    |   6 +
 .../s2graph/core/rest/RequestParser.scala       |   2 +
 .../apache/s2graph/core/storage/Storage.scala   |   3 +
 .../s2graph/core/Integrate/QueryTest.scala      | 196 +++++++++++++++++++
 5 files changed, 210 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4c255414/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e1109e2..9ce9d71 100644
--- a/CHANGES
+++ b/CHANGES
@@ -37,6 +37,9 @@ Release 0.12.1 - unreleased
 
     S2GRAPH-50: Provide new HBase Storage Schema (Committed by DOYUNG YOON).
 
+    S2GRAPH-60: Add divide operation to scorePropagateOp.
+               (Contributed by Junki Kim<[email protected]>, committed by 
DOYUNG YOON).
+
   IMPROVEMENT
 
     S2GRAPH-14: Abstract HBase specific methods in Management and Label 
(Committed by DOYUNG YOON).

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4c255414/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 4c9175c..a5d7517 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -321,6 +321,7 @@ case class QueryParam(labelWithDir: LabelWithDirection, 
timestamp: Long = System
   var timeDecay: Option[TimeDecay] = None
   var transformer: EdgeTransformer = EdgeTransformer(this, 
EdgeTransformer.DefaultJson)
   var scorePropagateOp: String = "multiply"
+  var scorePropagateShrinkage: Long = 500
   var exclude = false
   var include = false
   var shouldNormalize= false
@@ -479,6 +480,11 @@ case class QueryParam(labelWithDir: LabelWithDirection, 
timestamp: Long = System
     this
   }
 
+  def scorePropagateShrinkage(scorePropagateShrinkage: Long): QueryParam = {
+    this.scorePropagateShrinkage = scorePropagateShrinkage
+    this
+  }
+
   def tgtVertexInnerIdOpt(other: Option[InnerValLike]): QueryParam = {
     this.tgtVertexInnerIdOpt = other
     this

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4c255414/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index f8671a7..afda6f9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -397,6 +397,7 @@ class RequestParser(config: Config) extends JSONParser {
       val outputField = (labelGroup \ "outputField").asOpt[String].map(s => 
Json.arr(Json.arr(s)))
       val transformer = if (outputField.isDefined) outputField else 
(labelGroup \ "transform").asOpt[JsValue]
       val scorePropagateOp = (labelGroup \ 
"scorePropagateOp").asOpt[String].getOrElse("multiply")
+      val scorePropagateShrinkage = (labelGroup \ 
"scorePropagateShrinkage").asOpt[Long].getOrElse(500l)
       val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1)
       val shouldNormalize = (labelGroup \ 
"normalize").asOpt[Boolean].getOrElse(false)
       val cursorOpt = (labelGroup \ "cursor").asOpt[String]
@@ -422,6 +423,7 @@ class RequestParser(config: Config) extends JSONParser {
         .threshold(threshold)
         .transformer(transformer)
         .scorePropagateOp(scorePropagateOp)
+        .scorePropagateShrinkage(scorePropagateShrinkage)
         .shouldNormalize(shouldNormalize)
         .cursorOpt(cursorOpt)
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4c255414/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index a4af95a..fd968ad 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -711,6 +711,9 @@ abstract class Storage[R](val config: Config)(implicit ec: 
ExecutionContext) {
         val currentScore =
           queryParam.scorePropagateOp match {
             case "plus" => edge.rank(queryParam.rank) + prevScore
+            case "divide" =>
+              if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+              else edge.rank(queryParam.rank) / (prevScore + 
queryParam.scorePropagateShrinkage)
             case _ => edge.rank(queryParam.rank) * prevScore
           }
         EdgeWithScore(edge, currentScore)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4c255414/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
index 7eaa2e2..8bdc99b 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
@@ -859,6 +859,202 @@ class QueryTest extends IntegrateCommon with 
BeforeAndAfterEach {
     results.size should be(4)
   }
 
+  test("scorePropagateOp test") {
+    def queryWithPropertyOp(id: String, op: String, shrinkageVal: Long) = 
Json.parse(
+      s"""{
+         |  "limit": 10,
+         |  "groupBy": ["from"],
+         |  "duplicate": "sum",
+         |  "srcVertices": [
+         |    {
+         |      "serviceName": "$testServiceName",
+         |      "columnName": "$testColumnName",
+         |      "id": $id
+         |    }
+         |  ],
+         |  "steps": [
+         |    {
+         |      "step": [
+         |        {
+         |          "label": "$testLabelName",
+         |          "direction": "out",
+         |          "offset": 0,
+         |          "limit": 10,
+         |          "groupBy": ["from"],
+         |          "duplicate": "sum",
+         |          "index": "idx_1",
+         |          "scoring": {
+         |            "weight":1,
+         |            "time": 0
+         |          },
+         |          "transform": [["_from"]]
+         |        }
+         |      ]
+         |    }, {
+         |      "step": [
+         |        {
+         |          "label": "$testLabelName2",
+         |          "direction": "out",
+         |          "offset": 0,
+         |          "limit": 10,
+         |          "scorePropagateOp": "$op",
+         |          "scorePropagateShrinkage": $shrinkageVal
+         |        }
+         |      ]
+         |    }
+         |  ]
+         |}
+       """.stripMargin
+    )
+
+    def querySingleVertexWithOp(id: String, op: String, shrinkageVal: Long) = 
Json.parse(
+      s"""{
+         |  "limit": 10,
+         |  "groupBy": ["from"],
+         |  "duplicate": "sum",
+         |  "srcVertices": [
+         |    {
+         |      "serviceName": "$testServiceName",
+         |      "columnName": "$testColumnName",
+         |      "id": $id
+         |    }
+         |  ],
+         |  "steps": [
+         |    {
+         |      "step": [
+         |        {
+         |          "label": "$testLabelName",
+         |          "direction": "out",
+         |          "offset": 0,
+         |          "limit": 10,
+         |          "groupBy": ["from"],
+         |          "duplicate": "countSum",
+         |          "transform": [["_from"]]
+         |        }
+         |      ]
+         |    }, {
+         |      "step": [
+         |        {
+         |          "label": "$testLabelName2",
+         |          "direction": "out",
+         |          "offset": 0,
+         |          "limit": 10,
+         |          "scorePropagateOp": "$op",
+         |          "scorePropagateShrinkage": $shrinkageVal
+         |        }
+         |      ]
+         |    }
+         |  ]
+         |}
+       """.stripMargin
+    )
+
+    def queryMultiVerticesWithOp(id: String, id2: String, op: String, 
shrinkageVal: Long) = Json.parse(
+      s"""{
+         |  "limit": 10,
+         |  "groupBy": ["from"],
+         |  "duplicate": "sum",
+         |  "srcVertices": [
+         |    {
+         |      "serviceName": "$testServiceName",
+         |      "columnName": "$testColumnName",
+         |      "ids": [$id, $id2]
+         |    }
+         |  ],
+         |  "steps": [
+         |    {
+         |      "step": [
+         |        {
+         |          "label": "$testLabelName",
+         |          "direction": "out",
+         |          "offset": 0,
+         |          "limit": 10,
+         |          "groupBy": ["from"],
+         |          "duplicate": "countSum",
+         |          "transform": [["_from"]]
+         |        }
+         |      ]
+         |    }, {
+         |      "step": [
+         |        {
+         |          "label": "$testLabelName2",
+         |          "direction": "out",
+         |          "offset": 0,
+         |          "limit": 10,
+         |          "scorePropagateOp": "$op",
+         |          "scorePropagateShrinkage": $shrinkageVal
+         |        }
+         |      ]
+         |    }
+         |  ]
+         |}
+       """.stripMargin
+    )
+    val testId = "-30000"
+    val testId2 = "-4000"
+
+    val bulkEdges = Seq(
+      toEdge(1, insert, e, testId, 101, testLabelName, Json.obj(weight -> 
-10)),
+      toEdge(1, insert, e, testId, 102, testLabelName, Json.obj(weight -> 
-10)),
+      toEdge(1, insert, e, testId, 103, testLabelName, Json.obj(weight -> 
-10)),
+      toEdge(1, insert, e, testId, 102, testLabelName2, Json.obj(weight -> 
10)),
+      toEdge(1, insert, e, testId, 103, testLabelName2, Json.obj(weight -> 
10)),
+      toEdge(1, insert, e, testId, 104, testLabelName2, Json.obj(weight -> 
10)),
+      toEdge(1, insert, e, testId, 105, testLabelName2, Json.obj(weight -> 
10)),
+
+      toEdge(1, insert, e, testId2, 101, testLabelName, Json.obj(weight -> 
-10)),
+      toEdge(1, insert, e, testId2, 102, testLabelName, Json.obj(weight -> 
-10)),
+      toEdge(1, insert, e, testId2, 103, testLabelName, Json.obj(weight -> 
-10)),
+      toEdge(1, insert, e, testId2, 102, testLabelName2, Json.obj(weight -> 
10)),
+      toEdge(1, insert, e, testId2, 105, testLabelName2, Json.obj(weight -> 
10))
+    )
+    insertEdgesSync(bulkEdges: _*)
+
+    val firstStepEdgeCount = 3l
+    val secondStepEdgeCount = 4l
+
+    var shrinkageVal = 10l
+    var rs = getEdgesSync(querySingleVertexWithOp(testId, "divide", 
shrinkageVal))
+    logger.debug(Json.prettyPrint(rs))
+    var results = (rs \ "results").as[List[JsValue]]
+    results.size should be(1)
+    var scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble 
+ shrinkageVal)
+    (results(0) \ "scoreSum").as[Double] should be(scoreSum)
+
+    rs = getEdgesSync(queryMultiVerticesWithOp(testId, testId2, "divide", 
shrinkageVal))
+    logger.debug(Json.prettyPrint(rs))
+    results = (rs \ "results").as[List[JsValue]]
+    results.size should be(2)
+    scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + 
shrinkageVal)
+    (results(0) \ "scoreSum").as[Double] should be(scoreSum)
+    scoreSum = 2.toDouble / (3.toDouble + shrinkageVal)
+    (results(1) \ "scoreSum").as[Double] should be(scoreSum)
+
+    // check for divide zero case
+    shrinkageVal = 30l
+    rs = getEdgesSync(queryWithPropertyOp(testId, "divide", shrinkageVal))
+    logger.debug(Json.prettyPrint(rs))
+    results = (rs \ "results").as[List[JsValue]]
+    results.size should be(1)
+    (results(0) \ "scoreSum").as[Double] should be(0)
+
+    // "plus" operation
+    rs = getEdgesSync(querySingleVertexWithOp(testId, "plus", shrinkageVal))
+    logger.debug(Json.prettyPrint(rs))
+    results = (rs \ "results").as[List[JsValue]]
+    results.size should be(1)
+    scoreSum = (firstStepEdgeCount + 1) * secondStepEdgeCount
+    (results(0) \ "scoreSum").as[Long] should be(scoreSum)
+
+    // "multiply" operation
+    rs = getEdgesSync(querySingleVertexWithOp(testId, "multiply", 
shrinkageVal))
+    logger.debug(Json.prettyPrint(rs))
+    results = (rs \ "results").as[List[JsValue]]
+    results.size should be(1)
+    scoreSum = (firstStepEdgeCount * 1) * secondStepEdgeCount
+    (results(0) \ "scoreSum").as[Long] should be(scoreSum)
+  }
+
   def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse(
     s"""
           { "srcVertices": [

Reply via email to