Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 320c36cf0 -> b58e6f99e


[S2GRAPH-35] Provide normalize option on query.

  provide normalize operations on fetched QueryResult.

JIRA:
  [S2GRAPH-35] https://issues.apache.org/jira/browse/S2GRAPH-35

Pull Request:
  Closes #18


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b58e6f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b58e6f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b58e6f99

Branch: refs/heads/master
Commit: b58e6f99e627137a9179870cedf6ebf3bef4d1e7
Parents: 320c36c
Author: DO YUNG YOON <[email protected]>
Authored: Tue Feb 23 16:28:01 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Feb 23 16:28:01 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 ++
 .../com/kakao/s2graph/core/QueryParam.scala     |  6 ++++++
 .../kakao/s2graph/core/rest/RequestParser.scala |  2 ++
 .../storage/hbase/AsynchbaseQueryBuilder.scala  | 22 ++++++++++++++++----
 4 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b58e6f99/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 620ca62..419de68 100644
--- a/CHANGES
+++ b/CHANGES
@@ -6,6 +6,8 @@ Release 0.12.1 - unreleased
     
     S2GRAPH-34: Provide option to select which field in edge's properties to 
run timeDecay function (Committed by DOYUNG YOON).
 
+    S2GRAPH-35: Provide normalize option on query (Committed by DOYUNG YOON).
+
   IMPROVEMENT
 
     S2GRAPH-14: Abstract HBase specific methods in Management and Label 
(Committed by DOYUNG YOON).

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b58e6f99/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
index 7f52e1a..c3a6f17 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
@@ -268,6 +268,7 @@ case class QueryParam(labelWithDir: LabelWithDirection, 
timestamp: Long = System
   var scorePropagateOp: String = "multiply"
   var exclude = false
   var include = false
+  var shouldNormalize= false
 
   var columnRangeFilterMinBytes = Array.empty[Byte]
   var columnRangeFilterMaxBytes = Array.empty[Byte]
@@ -455,6 +456,11 @@ case class QueryParam(labelWithDir: LabelWithDirection, 
timestamp: Long = System
     this
   }
 
+  def shouldNormalize(shouldNormalize: Boolean): QueryParam = {
+    this.shouldNormalize = shouldNormalize
+    this
+  }
+
   def isSnapshotEdge = tgtVertexInnerIdOpt.isDefined
 
   override def toString = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b58e6f99/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
index beaa151..141b2fe 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
@@ -298,6 +298,7 @@ class RequestParser(config: Config) extends JSONParser {
       val transformer = if (outputField.isDefined) outputField else 
(labelGroup \ "transform").asOpt[JsValue]
       val scorePropagateOp = (labelGroup \ 
"scorePropagateOp").asOpt[String].getOrElse("multiply")
       val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1)
+      val shouldNormalize = (labelGroup \ 
"normalize").asOpt[Boolean].getOrElse(false)
 
       // FIXME: Order of command matter
       QueryParam(labelWithDir)
@@ -321,6 +322,7 @@ class RequestParser(config: Config) extends JSONParser {
         .threshold(threshold)
         .transformer(transformer)
         .scorePropagateOp(scorePropagateOp)
+        .shouldNormalize(shouldNormalize)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b58e6f99/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
index b78669c..5628b2e 100644
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
+++ 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
@@ -122,16 +122,30 @@ class AsynchbaseQueryBuilder(storage: 
AsynchbaseStorage)(implicit ec: ExecutionC
           if (randoms.contains(idx)) samples = e :: samples
           idx += 1
         }
-        samples.toSeq
+        samples
       }
     }
 
+    def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+      val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + 
cur.score }
+      edgeWithScores.map { edgeWithScore =>
+        edgeWithScore.copy(score = edgeWithScore.score / sum)
+      }
+
+    }
+
     def fetchInner(request: GetRequest) = {
+
       storage.client.get(request) withCallback { kvs =>
         val edgeWithScores = storage.toEdges(kvs.toSeq, 
queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges)
-        val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
-          sample(edgeWithScores, queryRequest.queryParam.sample)
-        } else edgeWithScores
+        val normalized =
+          if (queryRequest.queryParam.shouldNormalize) 
normalize(edgeWithScores)
+          else edgeWithScores
+
+        val resultEdgesWithScores =
+          if (queryRequest.queryParam.sample >= 0) sample(normalized, 
queryRequest.queryParam.sample)
+          else normalized
+
         QueryRequestWithResult(queryRequest, 
QueryResult(resultEdgesWithScores))
       } recoverWith { ex =>
         logger.error(s"fetchQueryParam failed. fallback return.", ex)

Reply via email to