Repository: incubator-s2graph Updated Branches: refs/heads/master e9bb727f1 -> 4c2554145
[S2GRAPH-60]: Add divide operation to scorePropagateOp. add divide operation on score propagation. (contributed by @wishoping ) JIRA: [S2GRAPH-60] https://issues.apache.org/jira/browse/S2GRAPH-60 Pull Request: Closes #43 Authors: Junki Kim: [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4c255414 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4c255414 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4c255414 Branch: refs/heads/master Commit: 4c2554145695abcd9506461c7498f3331586db6e Parents: e9bb727 Author: DO YUNG YOON <[email protected]> Authored: Thu Mar 17 17:35:30 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu Mar 17 17:35:30 2016 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/s2graph/core/QueryParam.scala | 6 + .../s2graph/core/rest/RequestParser.scala | 2 + .../apache/s2graph/core/storage/Storage.scala | 3 + .../s2graph/core/Integrate/QueryTest.scala | 196 +++++++++++++++++++ 5 files changed, 210 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4c255414/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e1109e2..9ce9d71 100644 --- a/CHANGES +++ b/CHANGES @@ -37,6 +37,9 @@ Release 0.12.1 - unreleased S2GRAPH-50: Provide new HBase Storage Schema (Committed by DOYUNG YOON). + S2GRAPH-60: Add divide operation to scorePropagateOp. + (Contributed by Junki Kim<[email protected]>, committed by DOYUNG YOON). + IMPROVEMENT S2GRAPH-14: Abstract HBase specific methods in Management and Label (Committed by DOYUNG YOON). http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4c255414/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index 4c9175c..a5d7517 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -321,6 +321,7 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System var timeDecay: Option[TimeDecay] = None var transformer: EdgeTransformer = EdgeTransformer(this, EdgeTransformer.DefaultJson) var scorePropagateOp: String = "multiply" + var scorePropagateShrinkage: Long = 500 var exclude = false var include = false var shouldNormalize= false @@ -479,6 +480,11 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System this } + def scorePropagateShrinkage(scorePropagateShrinkage: Long): QueryParam = { + this.scorePropagateShrinkage = scorePropagateShrinkage + this + } + def tgtVertexInnerIdOpt(other: Option[InnerValLike]): QueryParam = { this.tgtVertexInnerIdOpt = other this http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4c255414/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index f8671a7..afda6f9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -397,6 +397,7 @@ class RequestParser(config: Config) extends JSONParser { val outputField = (labelGroup \ "outputField").asOpt[String].map(s => Json.arr(Json.arr(s))) val transformer = if (outputField.isDefined) outputField else (labelGroup \ "transform").asOpt[JsValue] val scorePropagateOp = (labelGroup \ "scorePropagateOp").asOpt[String].getOrElse("multiply") + val scorePropagateShrinkage = (labelGroup \ "scorePropagateShrinkage").asOpt[Long].getOrElse(500l) val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1) val shouldNormalize = (labelGroup \ "normalize").asOpt[Boolean].getOrElse(false) val cursorOpt = (labelGroup \ "cursor").asOpt[String] @@ -422,6 +423,7 @@ class RequestParser(config: Config) extends JSONParser { .threshold(threshold) .transformer(transformer) .scorePropagateOp(scorePropagateOp) + .scorePropagateShrinkage(scorePropagateShrinkage) .shouldNormalize(shouldNormalize) .cursorOpt(cursorOpt) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4c255414/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index a4af95a..fd968ad 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -711,6 +711,9 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { val currentScore = queryParam.scorePropagateOp match { case "plus" => edge.rank(queryParam.rank) + prevScore + case "divide" => + if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 + else edge.rank(queryParam.rank) / (prevScore + queryParam.scorePropagateShrinkage) case _ => edge.rank(queryParam.rank) * prevScore } EdgeWithScore(edge, currentScore) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4c255414/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala index 7eaa2e2..8bdc99b 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala @@ -859,6 +859,202 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach { results.size should be(4) } + test("scorePropagateOp test") { + def queryWithPropertyOp(id: String, op: String, shrinkageVal: Long) = Json.parse( + s"""{ + | "limit": 10, + | "groupBy": ["from"], + | "duplicate": "sum", + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10, + | "groupBy": ["from"], + | "duplicate": "sum", + | "index": "idx_1", + | "scoring": { + | "weight":1, + | "time": 0 + | }, + | "transform": [["_from"]] + | } + | ] + | }, { + | "step": [ + | { + | "label": "$testLabelName2", + | "direction": "out", + | "offset": 0, + | "limit": 10, + | "scorePropagateOp": "$op", + | "scorePropagateShrinkage": $shrinkageVal + | } + | ] + | } + | ] + |} + """.stripMargin + ) + + def querySingleVertexWithOp(id: String, op: String, shrinkageVal: Long) = Json.parse( + s"""{ + | "limit": 10, + | "groupBy": ["from"], + | "duplicate": "sum", + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "id": $id + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10, + | "groupBy": ["from"], + | "duplicate": "countSum", + | "transform": [["_from"]] + | } + | ] + | }, { + | "step": [ + | { + | "label": "$testLabelName2", + | "direction": "out", + | "offset": 0, + | "limit": 10, + | "scorePropagateOp": "$op", + | "scorePropagateShrinkage": $shrinkageVal + | } + | ] + | } + | ] + |} + """.stripMargin + ) + + def queryMultiVerticesWithOp(id: String, id2: String, op: String, shrinkageVal: Long) = Json.parse( + s"""{ + | "limit": 10, + | "groupBy": ["from"], + | "duplicate": "sum", + | "srcVertices": [ + | { + | "serviceName": "$testServiceName", + | "columnName": "$testColumnName", + | "ids": [$id, $id2] + | } + | ], + | "steps": [ + | { + | "step": [ + | { + | "label": "$testLabelName", + | "direction": "out", + | "offset": 0, + | "limit": 10, + | "groupBy": ["from"], + | "duplicate": "countSum", + | "transform": [["_from"]] + | } + | ] + | }, { + | "step": [ + | { + | "label": "$testLabelName2", + | "direction": "out", + | "offset": 0, + | "limit": 10, + | "scorePropagateOp": "$op", + | "scorePropagateShrinkage": $shrinkageVal + | } + | ] + | } + | ] + |} + """.stripMargin + ) + val testId = "-30000" + val testId2 = "-4000" + + val bulkEdges = Seq( + toEdge(1, insert, e, testId, 101, testLabelName, Json.obj(weight -> -10)), + toEdge(1, insert, e, testId, 102, testLabelName, Json.obj(weight -> -10)), + toEdge(1, insert, e, testId, 103, testLabelName, Json.obj(weight -> -10)), + toEdge(1, insert, e, testId, 102, testLabelName2, Json.obj(weight -> 10)), + toEdge(1, insert, e, testId, 103, testLabelName2, Json.obj(weight -> 10)), + toEdge(1, insert, e, testId, 104, testLabelName2, Json.obj(weight -> 10)), + toEdge(1, insert, e, testId, 105, testLabelName2, Json.obj(weight -> 10)), + + toEdge(1, insert, e, testId2, 101, testLabelName, Json.obj(weight -> -10)), + toEdge(1, insert, e, testId2, 102, testLabelName, Json.obj(weight -> -10)), + toEdge(1, insert, e, testId2, 103, testLabelName, Json.obj(weight -> -10)), + toEdge(1, insert, e, testId2, 102, testLabelName2, Json.obj(weight -> 10)), + toEdge(1, insert, e, testId2, 105, testLabelName2, Json.obj(weight -> 10)) + ) + insertEdgesSync(bulkEdges: _*) + + val firstStepEdgeCount = 3l + val secondStepEdgeCount = 4l + + var shrinkageVal = 10l + var rs = getEdgesSync(querySingleVertexWithOp(testId, "divide", shrinkageVal)) + logger.debug(Json.prettyPrint(rs)) + var results = (rs \ "results").as[List[JsValue]] + results.size should be(1) + var scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + shrinkageVal) + (results(0) \ "scoreSum").as[Double] should be(scoreSum) + + rs = getEdgesSync(queryMultiVerticesWithOp(testId, testId2, "divide", shrinkageVal)) + logger.debug(Json.prettyPrint(rs)) + results = (rs \ "results").as[List[JsValue]] + results.size should be(2) + scoreSum = secondStepEdgeCount.toDouble / (firstStepEdgeCount.toDouble + shrinkageVal) + (results(0) \ "scoreSum").as[Double] should be(scoreSum) + scoreSum = 2.toDouble / (3.toDouble + shrinkageVal) + (results(1) \ "scoreSum").as[Double] should be(scoreSum) + + // check for divide zero case + shrinkageVal = 30l + rs = getEdgesSync(queryWithPropertyOp(testId, "divide", shrinkageVal)) + logger.debug(Json.prettyPrint(rs)) + results = (rs \ "results").as[List[JsValue]] + results.size should be(1) + (results(0) \ "scoreSum").as[Double] should be(0) + + // "plus" operation + rs = getEdgesSync(querySingleVertexWithOp(testId, "plus", shrinkageVal)) + logger.debug(Json.prettyPrint(rs)) + results = (rs \ "results").as[List[JsValue]] + results.size should be(1) + scoreSum = (firstStepEdgeCount + 1) * secondStepEdgeCount + (results(0) \ "scoreSum").as[Long] should be(scoreSum) + + // "multiply" operation + rs = getEdgesSync(querySingleVertexWithOp(testId, "multiply", shrinkageVal)) + logger.debug(Json.prettyPrint(rs)) + results = (rs \ "results").as[List[JsValue]] + results.size should be(1) + scoreSum = (firstStepEdgeCount * 1) * secondStepEdgeCount + (results(0) \ "scoreSum").as[Long] should be(scoreSum) + } + def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse( s""" { "srcVertices": [
