Repository: incubator-s2graph Updated Branches: refs/heads/master 320c36cf0 -> b58e6f99e
[S2GRAPH-35] Provide normalize option on query. provide normalize operations on fetched QueryResult. JIRA: [S2GRAPH-35] https://issues.apache.org/jira/browse/S2GRAPH-35 Pull Request: Closes #18 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b58e6f99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b58e6f99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b58e6f99 Branch: refs/heads/master Commit: b58e6f99e627137a9179870cedf6ebf3bef4d1e7 Parents: 320c36c Author: DO YUNG YOON <[email protected]> Authored: Tue Feb 23 16:28:01 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Feb 23 16:28:01 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../com/kakao/s2graph/core/QueryParam.scala | 6 ++++++ .../kakao/s2graph/core/rest/RequestParser.scala | 2 ++ .../storage/hbase/AsynchbaseQueryBuilder.scala | 22 ++++++++++++++++---- 4 files changed, 28 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b58e6f99/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 620ca62..419de68 100644 --- a/CHANGES +++ b/CHANGES @@ -6,6 +6,8 @@ Release 0.12.1 - unreleased S2GRAPH-34: Provide option to select which field in edge's properties to run timeDecay function (Committed by DOYUNG YOON). + S2GRAPH-35: Provide normalize option on query (Committed by DOYUNG YOON). + IMPROVEMENT S2GRAPH-14: Abstract HBase specific methods in Management and Label (Committed by DOYUNG YOON). http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b58e6f99/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala index 7f52e1a..c3a6f17 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala @@ -268,6 +268,7 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System var scorePropagateOp: String = "multiply" var exclude = false var include = false + var shouldNormalize= false var columnRangeFilterMinBytes = Array.empty[Byte] var columnRangeFilterMaxBytes = Array.empty[Byte] @@ -455,6 +456,11 @@ case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System this } + def shouldNormalize(shouldNormalize: Boolean): QueryParam = { + this.shouldNormalize = shouldNormalize + this + } + def isSnapshotEdge = tgtVertexInnerIdOpt.isDefined override def toString = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b58e6f99/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala index beaa151..141b2fe 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala @@ -298,6 +298,7 @@ class RequestParser(config: Config) extends JSONParser { val transformer = if (outputField.isDefined) outputField else (labelGroup \ "transform").asOpt[JsValue] val scorePropagateOp = (labelGroup \ "scorePropagateOp").asOpt[String].getOrElse("multiply") val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1) + val shouldNormalize = (labelGroup \ "normalize").asOpt[Boolean].getOrElse(false) // FIXME: Order of command matter QueryParam(labelWithDir) @@ -321,6 +322,7 @@ class RequestParser(config: Config) extends JSONParser { .threshold(threshold) .transformer(transformer) .scorePropagateOp(scorePropagateOp) + .shouldNormalize(shouldNormalize) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b58e6f99/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala index b78669c..5628b2e 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala @@ -122,16 +122,30 @@ class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionC if (randoms.contains(idx)) samples = e :: samples idx += 1 } - samples.toSeq + samples } } + def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { + val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } + edgeWithScores.map { edgeWithScore => + edgeWithScore.copy(score = edgeWithScore.score / sum) + } + + } + def fetchInner(request: GetRequest) = { + storage.client.get(request) withCallback { kvs => val edgeWithScores = storage.toEdges(kvs.toSeq, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges) - val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) { - sample(edgeWithScores, queryRequest.queryParam.sample) - } else edgeWithScores + val normalized = + if (queryRequest.queryParam.shouldNormalize) normalize(edgeWithScores) + else edgeWithScores + + val resultEdgesWithScores = + if (queryRequest.queryParam.sample >= 0) sample(normalized, queryRequest.queryParam.sample) + else normalized + QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores)) } recoverWith { ex => logger.error(s"fetchQueryParam failed. fallback return.", ex)
