Repository: incubator-s2graph
Updated Branches:
  refs/heads/master e45d69d3c -> d9a7860b0


[S2GRAPH-33]: Support weighted sum of multiple query results.

  add multiQuery support and refactor unused codes.

JIRA:
  [S2GRAPH-33] https://issues.apache.org/jira/browse/S2GRAPH-33

Pull Request:
  Closes #30


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d9a7860b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d9a7860b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d9a7860b

Branch: refs/heads/master
Commit: d9a7860b03e6631240362561250449092f9b2168
Parents: e45d69d
Author: DO YUNG YOON <[email protected]>
Authored: Tue Feb 23 23:31:59 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Feb 23 23:31:59 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../com/kakao/s2graph/core/PostProcess.scala    | 33 +++++++++--
 .../com/kakao/s2graph/core/QueryParam.scala     |  1 +
 .../kakao/s2graph/core/rest/RequestParser.scala | 11 ++++
 .../kakao/s2graph/core/rest/RestHandler.scala   | 50 ++++++++++++++++-
 .../s2graph/core/storage/QueryBuilder.scala     | 10 ++--
 .../storage/hbase/AsynchbaseQueryBuilder.scala  | 59 +++++++++++---------
 .../core/storage/hbase/AsynchbaseStorage.scala  |  2 +-
 8 files changed, 127 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b7c2366..888157c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -13,6 +13,8 @@ Release 0.12.1 - unreleased
     S2GRAPH-32: Support variable such as now, day, hour on query (Committed by 
DOYUNG YOON).
 
     S2GRAPH-45: Provide way to call specific bucket on experiment (Committed 
by DOYUNG YOON).
+  
+    S2GRAPH-33: Support weighted sum of multiple query results (Committed by 
DOYUNG YOON).
 
   IMPROVEMENT
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
index 702ccf0..42b0146 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
@@ -1,12 +1,12 @@
 package com.kakao.s2graph.core
 
 import com.kakao.s2graph.core.GraphExceptions.BadQueryException
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls._
+
+import com.kakao.s2graph.core.mysqls.{ColumnMeta, Label, ServiceColumn, 
LabelMeta}
 import com.kakao.s2graph.core.types.{InnerVal, InnerValLike}
+import com.kakao.s2graph.core.utils.logger
 import play.api.libs.json.{Json, _}
-
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 object PostProcess extends JSONParser {
 
@@ -320,7 +320,30 @@ object PostProcess extends JSONParser {
       )
     }
   }
-  
+
+  def toSimpleVertexArrJsonMulti(queryOption: QueryOption,
+                                 resultWithExcludeLs: 
Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])],
+                                 excludes: Seq[QueryRequestWithResult]): 
JsValue = {
+    val excludeIds = (Seq((Seq.empty, excludes)) ++ 
resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, 
excludes)) =>
+      acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap
+    }
+
+    val (degrees, rawEdges) = (ListBuffer.empty[JsValue], 
ListBuffer.empty[RAW_EDGE])
+    for {
+      (result, localExclude) <- resultWithExcludeLs
+    } {
+      val newResult = result.map { queryRequestWithResult =>
+        val (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+        val newQuery = queryRequest.query.copy(queryOption = queryOption)
+        queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = 
newQuery))
+      }
+      val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, 
excludeIds)
+      degrees ++= _degrees
+      rawEdges ++= _rawEdges
+    }
+    buildResultJsValue(queryOption, degrees, rawEdges)
+  }
+
   def toSimpleVertexArrJson(queryOption: QueryOption,
                             queryRequestWithResultLs: 
Seq[QueryRequestWithResult],
                             exclude: Seq[QueryRequestWithResult]): JsValue = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
index b886441..a9a5112 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
@@ -45,6 +45,7 @@ case class QueryOption(removeCycle: Boolean = false,
                        scoreThreshold: Double = Double.MinValue,
                        returnDegree: Boolean = true)
 
+case class MultiQuery(queries: Seq[Query], weights: Seq[Double], queryOption: 
QueryOption)
 
 case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
                  steps: IndexedSeq[Step] = Vector.empty[Step],

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
index c7cde59..a87e9d1 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
@@ -174,6 +174,17 @@ class RequestParser(config: Config) extends JSONParser {
     }
     vertices.toSeq
   }
+
+  def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery 
= {
+    val queries = for {
+      queryJson <- (jsValue \ 
"queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty)
+    } yield {
+      toQuery(queryJson, isEdgeQuery)
+    }
+    val weights = (jsValue \ 
"weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0))
+    MultiQuery(queries = queries, weights = weights, queryOption = 
toQueryOption(jsValue))
+  }
+
   def toQueryOption(jsValue: JsValue): QueryOption = {
     val filterOutFields = (jsValue \ 
"filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name))
     val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => 
toQuery(v) }.map { q =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
index a9424d0..b130854 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
@@ -128,13 +128,57 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
     }
   }
 
-  private def getEdgesAsync(jsonQuery: JsValue)
-                           (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
+  def getEdgesAsync(jsonQuery: JsValue)
+                   (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
 
     val fetch = eachQuery(post) _
     jsonQuery match {
       case JsArray(arr) => 
Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
-      case obj@JsObject(_) => fetch(requestParser.toQuery(obj))
+      case obj@JsObject(_) =>
+        (obj \ "queries").asOpt[JsValue] match {
+          case None => fetch(requestParser.toQuery(obj))
+          case _ =>
+            val multiQuery = requestParser.toMultiQuery(obj)
+            val filterOutFuture = multiQuery.queryOption.filterOutQuery match {
+              case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
+              case None => Future.successful(Seq.empty)
+            }
+            val futures = multiQuery.queries.zip(multiQuery.weights).map { 
case (query, weight) =>
+              val filterOutQueryResultsLs = query.queryOption.filterOutQuery 
match {
+                case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
+                case None => Future.successful(Seq.empty)
+              }
+              for {
+                queryRequestWithResultLs <- graph.getEdges(query)
+                filterOutResultsLs <- filterOutQueryResultsLs
+              } yield {
+                val newQueryRequestWithResult = for {
+                  queryRequestWithResult <- queryRequestWithResultLs
+                  queryResult = queryRequestWithResult.queryResult
+                } yield {
+                  val newEdgesWithScores = for {
+                    edgeWithScore <- 
queryRequestWithResult.queryResult.edgeWithScoreLs
+                  } yield {
+                    edgeWithScore.copy(score = edgeWithScore.score * weight)
+                  }
+                  queryRequestWithResult.copy(queryResult = 
queryResult.copy(edgeWithScoreLs = newEdgesWithScores))
+                }
+                logger.debug(s"[Size]: 
${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}")
+                (newQueryRequestWithResult, filterOutResultsLs)
+              }
+            }
+            for {
+              filterOut <- filterOutFuture
+              resultWithExcludeLs <- Future.sequence(futures)
+            } yield {
+              PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, 
resultWithExcludeLs, filterOut)
+              //              val initial = 
(ListBuffer.empty[QueryRequestWithResult], 
ListBuffer.empty[QueryRequestWithResult])
+              //              val (results, excludes) = 
resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), 
(results, excludes)) =>
+              //                (prevResults ++= results, prevExcludes ++= 
excludes)
+              //              }
+              //              
PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ 
filterOut)
+            }
+        }
       case _ => throw BadQueryException("Cannot support")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala
index b6380dd..f04e342 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala
@@ -25,11 +25,11 @@ abstract class QueryBuilder[R, T](storage: 
Storage)(implicit ec: ExecutionContex
               prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): 
Future[Seq[QueryRequestWithResult]]
 
 
-  def fetchStep(queryRequestWithResultsLs: Seq[QueryRequestWithResult]): 
Future[Seq[QueryRequestWithResult]] = {
+  def fetchStep(orgQuery: Query, queryRequestWithResultsLs: 
Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = {
     if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil)
     else {
       val queryRequest = queryRequestWithResultsLs.head.queryRequest
-      val q = queryRequest.query
+      val q = orgQuery
       val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult)
 
       val stepIdx = queryRequest.stepIdx + 1
@@ -72,10 +72,10 @@ abstract class QueryBuilder[R, T](storage: 
Storage)(implicit ec: ExecutionContex
     }
   }
 
-  def fetchStepFuture(queryRequestWithResultLsFuture: 
Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = {
+  def fetchStepFuture(orgQuery: Query, queryRequestWithResultLsFuture: 
Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = {
     for {
       queryRequestWithResultLs <- queryRequestWithResultLsFuture
-      ret <- fetchStep(queryRequestWithResultLs)
+      ret <- fetchStep(orgQuery, queryRequestWithResultLs)
     } yield ret
   }
 
@@ -92,7 +92,7 @@ abstract class QueryBuilder[R, T](storage: Storage)(implicit 
ec: ExecutionContex
         // current stepIdx = -1
         val startQueryResultLs = QueryResult.fromVertices(q)
         q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, 
step) =>
-          fetchStepFuture(acc)
+          fetchStepFuture(q, acc)
         }
       }
     } recover {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
index 5628b2e..337ed3f 100644
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
+++ 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
@@ -22,6 +22,28 @@ class AsynchbaseQueryBuilder(storage: 
AsynchbaseStorage)(implicit ec: ExecutionC
 
   import Extensions.DeferOps
 
+  val maxSize = storage.config.getInt("future.cache.max.size")
+  val expreAfterWrite = 
storage.config.getInt("future.cache.expire.after.write")
+  val expreAfterAccess = 
storage.config.getInt("future.cache.expire.after.access")
+
+  val futureCache = CacheBuilder.newBuilder()
+//  .recordStats()
+  .initialCapacity(maxSize)
+  .concurrencyLevel(Runtime.getRuntime.availableProcessors())
+  .expireAfterWrite(expreAfterWrite, TimeUnit.MILLISECONDS)
+  .expireAfterAccess(expreAfterAccess, TimeUnit.MILLISECONDS)
+//  .weakKeys()
+  .maximumSize(maxSize).build[java.lang.Long, (Long, 
Deferred[QueryRequestWithResult])]()
+
+  //  val scheduleTime = 60L * 60
+//  val scheduleTime = 60
+//  val scheduler = Executors.newScheduledThreadPool(1)
+//
+//  scheduler.scheduleAtFixedRate(new Runnable(){
+//    override def run() = {
+//      logger.info(s"[FutureCache]: ${futureCache.stats()}")
+//    }
+//  }, scheduleTime, scheduleTime, TimeUnit.SECONDS)
 
   override def buildRequest(queryRequest: QueryRequest): GetRequest = {
     val srcVertex = queryRequest.vertex
@@ -89,14 +111,6 @@ class AsynchbaseQueryBuilder(storage: 
AsynchbaseStorage)(implicit ec: ExecutionC
     fetch(queryRequest, 1.0, isInnerCall = true, parentEdges = Nil)
   }
 
-  val maxSize = storage.config.getInt("future.cache.max.size")
-  val futureCacheTTL = 
storage.config.getInt("future.cache.expire.after.access")
-  val futureCache = CacheBuilder.newBuilder()
-    .initialCapacity(maxSize)
-    .concurrencyLevel(Runtime.getRuntime.availableProcessors())
-    .expireAfterAccess(futureCacheTTL, TimeUnit.MILLISECONDS)
-    .maximumSize(maxSize).build[java.lang.Long, (Long, 
Deferred[QueryRequestWithResult])]()
-
   override def fetch(queryRequest: QueryRequest,
                      prevStepScore: Double,
                      isInnerCall: Boolean,
@@ -108,9 +122,9 @@ class AsynchbaseQueryBuilder(storage: 
AsynchbaseStorage)(implicit ec: ExecutionC
     }
 
     def sample(edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
-      if (edges.size <= n) {
+      if (edges.size <= n){
         edges
-      } else {
+      }else{
         val plainEdges = if (queryRequest.queryParam.offset == 0) {
           edges.tail
         } else edges
@@ -124,26 +138,24 @@ class AsynchbaseQueryBuilder(storage: 
AsynchbaseStorage)(implicit ec: ExecutionC
         }
         samples
       }
-    }
 
+    }
     def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
       val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + 
cur.score }
       edgeWithScores.map { edgeWithScore =>
         edgeWithScore.copy(score = edgeWithScore.score / sum)
       }
-
     }
-
     def fetchInner(request: GetRequest) = {
-
       storage.client.get(request) withCallback { kvs =>
         val edgeWithScores = storage.toEdges(kvs.toSeq, 
queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges)
+
         val normalized =
           if (queryRequest.queryParam.shouldNormalize) 
normalize(edgeWithScores)
           else edgeWithScores
 
         val resultEdgesWithScores =
-          if (queryRequest.queryParam.sample >= 0) sample(normalized, 
queryRequest.queryParam.sample)
+          if (queryRequest.queryParam.sample >= 0 ) sample(normalized, 
queryRequest.queryParam.sample)
           else normalized
 
         QueryRequestWithResult(queryRequest, 
QueryResult(resultEdgesWithScores))
@@ -152,7 +164,6 @@ class AsynchbaseQueryBuilder(storage: 
AsynchbaseStorage)(implicit ec: ExecutionC
         QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
       }
     }
-
     def checkAndExpire(request: GetRequest,
                        cacheKey: Long,
                        cacheTTL: Long,
@@ -182,12 +193,13 @@ class AsynchbaseQueryBuilder(storage: 
AsynchbaseStorage)(implicit ec: ExecutionC
     val queryParam = queryRequest.queryParam
     val cacheTTL = queryParam.cacheTTLInMillis
     val request = buildRequest(queryRequest)
+
     if (cacheTTL <= 0) fetchInner(request)
     else {
       val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, 
toCacheKeyBytes(request))
       val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
 
-      val cacheVal = futureCache.asMap().get(cacheKey)
+      val cacheVal = futureCache.getIfPresent(cacheKey)
       cacheVal match {
         case null =>
           // here there is no promise set up for this cacheKey so we need to 
set promise on future cache.
@@ -207,6 +219,7 @@ class AsynchbaseQueryBuilder(storage: 
AsynchbaseStorage)(implicit ec: ExecutionC
           checkAndExpire(request, cacheKey, cacheTTL, cachedAt, defer)
       }
     }
+
   }
 
 
@@ -230,16 +243,8 @@ class AsynchbaseQueryBuilder(storage: 
AsynchbaseStorage)(implicit ec: ExecutionC
                        prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): 
Future[Seq[QueryRequestWithResult]] = {
     val defers: Seq[Deferred[QueryRequestWithResult]] = for {
       (queryRequest, prevStepScore) <- queryRequestWithScoreLs
-    } yield {
-        val prevStepEdgesOpt = prevStepEdges.get(queryRequest.vertex.id)
-        if (prevStepEdgesOpt.isEmpty) throw new RuntimeException("miss match 
on prevStepEdge and current GetRequest")
-
-        val parentEdges = for {
-          parentEdge <- prevStepEdgesOpt.get
-        } yield parentEdge
-
-        fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges)
-      }
+      parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
+    } yield fetch(queryRequest, prevStepScore, isInnerCall = false, 
parentEdges)
 
     val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = 
Deferred.group(defers)
     grouped withCallback {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index edc2780..9f6e91f 100644
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -65,7 +65,7 @@ class AsynchbaseStorage(override val config: Config, 
vertexCache: Cache[Integer,
   val queryBuilder = new AsynchbaseQueryBuilder(this)(ec)
   val mutationBuilder = new AsynchbaseMutationBuilder(this)(ec)
 
-//  val cacheOpt = Option(cache)
+  //  val cacheOpt = Option(cache)
   val cacheOpt = None
   val vertexCacheOpt = Option(vertexCache)
 

Reply via email to