Repository: incubator-s2graph Updated Branches: refs/heads/master e45d69d3c -> d9a7860b0
[S2GRAPH-33]: Support weighted sum of multiple query results. add multiQuery support and refactor unused codes. JIRA: [S2GRAPH-33] https://issues.apache.org/jira/browse/S2GRAPH-33 Pull Request: Closes #30 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d9a7860b Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d9a7860b Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d9a7860b Branch: refs/heads/master Commit: d9a7860b03e6631240362561250449092f9b2168 Parents: e45d69d Author: DO YUNG YOON <[email protected]> Authored: Tue Feb 23 23:31:59 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Feb 23 23:31:59 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../com/kakao/s2graph/core/PostProcess.scala | 33 +++++++++-- .../com/kakao/s2graph/core/QueryParam.scala | 1 + .../kakao/s2graph/core/rest/RequestParser.scala | 11 ++++ .../kakao/s2graph/core/rest/RestHandler.scala | 50 ++++++++++++++++- .../s2graph/core/storage/QueryBuilder.scala | 10 ++-- .../storage/hbase/AsynchbaseQueryBuilder.scala | 59 +++++++++++--------- .../core/storage/hbase/AsynchbaseStorage.scala | 2 +- 8 files changed, 127 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index b7c2366..888157c 100644 --- a/CHANGES +++ b/CHANGES @@ -13,6 +13,8 @@ Release 0.12.1 - unreleased S2GRAPH-32: Support variable such as now, day, hour on query (Committed by DOYUNG YOON). S2GRAPH-45: Provide way to call specific bucket on experiment (Committed by DOYUNG YOON). + + S2GRAPH-33: Support weighted sum of multiple query results (Committed by DOYUNG YOON). IMPROVEMENT http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala index 702ccf0..42b0146 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala @@ -1,12 +1,12 @@ package com.kakao.s2graph.core import com.kakao.s2graph.core.GraphExceptions.BadQueryException -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls._ + +import com.kakao.s2graph.core.mysqls.{ColumnMeta, Label, ServiceColumn, LabelMeta} import com.kakao.s2graph.core.types.{InnerVal, InnerValLike} +import com.kakao.s2graph.core.utils.logger import play.api.libs.json.{Json, _} - -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} object PostProcess extends JSONParser { @@ -320,7 +320,30 @@ object PostProcess extends JSONParser { ) } } - + + def toSimpleVertexArrJsonMulti(queryOption: QueryOption, + resultWithExcludeLs: Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])], + excludes: Seq[QueryRequestWithResult]): JsValue = { + val excludeIds = (Seq((Seq.empty, excludes)) ++ resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, excludes)) => + acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap + } + + val (degrees, rawEdges) = (ListBuffer.empty[JsValue], ListBuffer.empty[RAW_EDGE]) + for { + (result, localExclude) <- resultWithExcludeLs + } { + val newResult = result.map { queryRequestWithResult => + val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get + val newQuery = queryRequest.query.copy(queryOption = queryOption) + queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = newQuery)) + } + val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, excludeIds) + degrees ++= _degrees + rawEdges ++= _rawEdges + } + buildResultJsValue(queryOption, degrees, rawEdges) + } + def toSimpleVertexArrJson(queryOption: QueryOption, queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsValue = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala index b886441..a9a5112 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala @@ -45,6 +45,7 @@ case class QueryOption(removeCycle: Boolean = false, scoreThreshold: Double = Double.MinValue, returnDegree: Boolean = true) +case class MultiQuery(queries: Seq[Query], weights: Seq[Double], queryOption: QueryOption) case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], steps: IndexedSeq[Step] = Vector.empty[Step], http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala index c7cde59..a87e9d1 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala @@ -174,6 +174,17 @@ class RequestParser(config: Config) extends JSONParser { } vertices.toSeq } + + def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery = { + val queries = for { + queryJson <- (jsValue \ "queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty) + } yield { + toQuery(queryJson, isEdgeQuery) + } + val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0)) + MultiQuery(queries = queries, weights = weights, queryOption = toQueryOption(jsValue)) + } + def toQueryOption(jsValue: JsValue): QueryOption = { val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name)) val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v) }.map { q => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala index a9424d0..b130854 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala @@ -128,13 +128,57 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { } } - private def getEdgesAsync(jsonQuery: JsValue) - (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { + def getEdgesAsync(jsonQuery: JsValue) + (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { val fetch = eachQuery(post) _ jsonQuery match { case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray) - case obj@JsObject(_) => fetch(requestParser.toQuery(obj)) + case obj@JsObject(_) => + (obj \ "queries").asOpt[JsValue] match { + case None => fetch(requestParser.toQuery(obj)) + case _ => + val multiQuery = requestParser.toMultiQuery(obj) + val filterOutFuture = multiQuery.queryOption.filterOutQuery match { + case Some(filterOutQuery) => graph.getEdges(filterOutQuery) + case None => Future.successful(Seq.empty) + } + val futures = multiQuery.queries.zip(multiQuery.weights).map { case (query, weight) => + val filterOutQueryResultsLs = query.queryOption.filterOutQuery match { + case Some(filterOutQuery) => graph.getEdges(filterOutQuery) + case None => Future.successful(Seq.empty) + } + for { + queryRequestWithResultLs <- graph.getEdges(query) + filterOutResultsLs <- filterOutQueryResultsLs + } yield { + val newQueryRequestWithResult = for { + queryRequestWithResult <- queryRequestWithResultLs + queryResult = queryRequestWithResult.queryResult + } yield { + val newEdgesWithScores = for { + edgeWithScore <- queryRequestWithResult.queryResult.edgeWithScoreLs + } yield { + edgeWithScore.copy(score = edgeWithScore.score * weight) + } + queryRequestWithResult.copy(queryResult = queryResult.copy(edgeWithScoreLs = newEdgesWithScores)) + } + logger.debug(s"[Size]: ${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}") + (newQueryRequestWithResult, filterOutResultsLs) + } + } + for { + filterOut <- filterOutFuture + resultWithExcludeLs <- Future.sequence(futures) + } yield { + PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, resultWithExcludeLs, filterOut) + // val initial = (ListBuffer.empty[QueryRequestWithResult], ListBuffer.empty[QueryRequestWithResult]) + // val (results, excludes) = resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), (results, excludes)) => + // (prevResults ++= results, prevExcludes ++= excludes) + // } + // PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ filterOut) + } + } case _ => throw BadQueryException("Cannot support") } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala index b6380dd..f04e342 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/QueryBuilder.scala @@ -25,11 +25,11 @@ abstract class QueryBuilder[R, T](storage: Storage)(implicit ec: ExecutionContex prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]] - def fetchStep(queryRequestWithResultsLs: Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = { + def fetchStep(orgQuery: Query, queryRequestWithResultsLs: Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = { if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil) else { val queryRequest = queryRequestWithResultsLs.head.queryRequest - val q = queryRequest.query + val q = orgQuery val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult) val stepIdx = queryRequest.stepIdx + 1 @@ -72,10 +72,10 @@ abstract class QueryBuilder[R, T](storage: Storage)(implicit ec: ExecutionContex } } - def fetchStepFuture(queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = { + def fetchStepFuture(orgQuery: Query, queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = { for { queryRequestWithResultLs <- queryRequestWithResultLsFuture - ret <- fetchStep(queryRequestWithResultLs) + ret <- fetchStep(orgQuery, queryRequestWithResultLs) } yield ret } @@ -92,7 +92,7 @@ abstract class QueryBuilder[R, T](storage: Storage)(implicit ec: ExecutionContex // current stepIdx = -1 val startQueryResultLs = QueryResult.fromVertices(q) q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, step) => - fetchStepFuture(acc) + fetchStepFuture(q, acc) } } } recover { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala index 5628b2e..337ed3f 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala @@ -22,6 +22,28 @@ class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionC import Extensions.DeferOps + val maxSize = storage.config.getInt("future.cache.max.size") + val expreAfterWrite = storage.config.getInt("future.cache.expire.after.write") + val expreAfterAccess = storage.config.getInt("future.cache.expire.after.access") + + val futureCache = CacheBuilder.newBuilder() +// .recordStats() + .initialCapacity(maxSize) + .concurrencyLevel(Runtime.getRuntime.availableProcessors()) + .expireAfterWrite(expreAfterWrite, TimeUnit.MILLISECONDS) + .expireAfterAccess(expreAfterAccess, TimeUnit.MILLISECONDS) +// .weakKeys() + .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[QueryRequestWithResult])]() + + // val scheduleTime = 60L * 60 +// val scheduleTime = 60 +// val scheduler = Executors.newScheduledThreadPool(1) +// +// scheduler.scheduleAtFixedRate(new Runnable(){ +// override def run() = { +// logger.info(s"[FutureCache]: ${futureCache.stats()}") +// } +// }, scheduleTime, scheduleTime, TimeUnit.SECONDS) override def buildRequest(queryRequest: QueryRequest): GetRequest = { val srcVertex = queryRequest.vertex @@ -89,14 +111,6 @@ class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionC fetch(queryRequest, 1.0, isInnerCall = true, parentEdges = Nil) } - val maxSize = storage.config.getInt("future.cache.max.size") - val futureCacheTTL = storage.config.getInt("future.cache.expire.after.access") - val futureCache = CacheBuilder.newBuilder() - .initialCapacity(maxSize) - .concurrencyLevel(Runtime.getRuntime.availableProcessors()) - .expireAfterAccess(futureCacheTTL, TimeUnit.MILLISECONDS) - .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[QueryRequestWithResult])]() - override def fetch(queryRequest: QueryRequest, prevStepScore: Double, isInnerCall: Boolean, @@ -108,9 +122,9 @@ class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionC } def sample(edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { - if (edges.size <= n) { + if (edges.size <= n){ edges - } else { + }else{ val plainEdges = if (queryRequest.queryParam.offset == 0) { edges.tail } else edges @@ -124,26 +138,24 @@ class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionC } samples } - } + } def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } edgeWithScores.map { edgeWithScore => edgeWithScore.copy(score = edgeWithScore.score / sum) } - } - def fetchInner(request: GetRequest) = { - storage.client.get(request) withCallback { kvs => val edgeWithScores = storage.toEdges(kvs.toSeq, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges) + val normalized = if (queryRequest.queryParam.shouldNormalize) normalize(edgeWithScores) else edgeWithScores val resultEdgesWithScores = - if (queryRequest.queryParam.sample >= 0) sample(normalized, queryRequest.queryParam.sample) + if (queryRequest.queryParam.sample >= 0 ) sample(normalized, queryRequest.queryParam.sample) else normalized QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores)) @@ -152,7 +164,6 @@ class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionC QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) } } - def checkAndExpire(request: GetRequest, cacheKey: Long, cacheTTL: Long, @@ -182,12 +193,13 @@ class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionC val queryParam = queryRequest.queryParam val cacheTTL = queryParam.cacheTTLInMillis val request = buildRequest(queryRequest) + if (cacheTTL <= 0) fetchInner(request) else { val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) val cacheKey = queryParam.toCacheKey(cacheKeyBytes) - val cacheVal = futureCache.asMap().get(cacheKey) + val cacheVal = futureCache.getIfPresent(cacheKey) cacheVal match { case null => // here there is no promise set up for this cacheKey so we need to set promise on future cache. @@ -207,6 +219,7 @@ class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionC checkAndExpire(request, cacheKey, cacheTTL, cachedAt, defer) } } + } @@ -230,16 +243,8 @@ class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionC prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]] = { val defers: Seq[Deferred[QueryRequestWithResult]] = for { (queryRequest, prevStepScore) <- queryRequestWithScoreLs - } yield { - val prevStepEdgesOpt = prevStepEdges.get(queryRequest.vertex.id) - if (prevStepEdgesOpt.isEmpty) throw new RuntimeException("miss match on prevStepEdge and current GetRequest") - - val parentEdges = for { - parentEdge <- prevStepEdgesOpt.get - } yield parentEdge - - fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges) - } + parentEdges <- prevStepEdges.get(queryRequest.vertex.id) + } yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges) val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = Deferred.group(defers) grouped withCallback { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9a7860b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala index edc2780..9f6e91f 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -65,7 +65,7 @@ class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, val queryBuilder = new AsynchbaseQueryBuilder(this)(ec) val mutationBuilder = new AsynchbaseMutationBuilder(this)(ec) -// val cacheOpt = Option(cache) + // val cacheOpt = Option(cache) val cacheOpt = None val vertexCacheOpt = Option(vertexCache)
