http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala index 2cbddea..ca32a14 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala @@ -19,25 +19,31 @@ package org.apache.s2graph.core -import java.util -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors import com.typesafe.config.{Config, ConfigFactory} +import org.apache.hadoop.fs.Path +import org.apache.s2graph.core.GraphExceptions.{FetchAllStepFailException, FetchTimeoutException, LabelNotExistException} import org.apache.s2graph.core.JSONParser._ -import org.apache.s2graph.core.mysqls.{Label, Model} -import org.apache.s2graph.core.parsers.WhereParser +import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Model, Service} import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage -import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection} -import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.storage.{SKeyValue, Storage} +import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.utils.{DeferCache, Extensions, SafeUpdateCache, logger} import play.api.libs.json.{JsObject, Json} +import scala.annotation.tailrec import scala.collection.JavaConversions._ -import scala.collection._ -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.concurrent._ -import scala.util.Try +import scala.util.{Random, Try} object Graph { + + type HashKey = (Int, Int, Int, Int, Boolean) + type FilterHashKey = (Int, Int) + + val DefaultScore = 1.0 private val DefaultConfigs: Map[String, AnyRef] = Map( @@ -65,26 +71,130 @@ object Graph { "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), "future.cache.metric.interval" -> java.lang.Integer.valueOf(60000), + "query.future.cache.max.size" -> java.lang.Integer.valueOf(1000), + "query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), + "query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), + "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000), "s2graph.storage.backend" -> "hbase", - "query.hardlimit" -> java.lang.Integer.valueOf(100000) + "query.hardlimit" -> java.lang.Integer.valueOf(100000), + "hbase.zookeeper.znode.parent" -> "/hbase", + "query.log.sample.rate" -> Double.box(0.05) ) var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs) - /** helpers for filterEdges */ - type HashKey = (Int, Int, Int, Int, Boolean) - type FilterHashKey = (Int, Int) - type Result = (ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]], - ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)], - ListBuffer[(HashKey, FilterHashKey, Edge, Double)]) + def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { + val parts = GraphUtil.split(s) + val logType = parts(2) + val element = if (logType == "edge" | logType == "e") { + /** current only edge is considered to be bulk loaded */ + labelMapping.get(parts(5)) match { + case None => + case Some(toReplace) => + parts(5) = toReplace + } + toEdge(parts) + } else if (logType == "vertex" | logType == "v") { + toVertex(parts) + } else { + throw new GraphExceptions.JsonParseException("log type is not exist in log.") + } - def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = { - val src = edge.srcVertex.innerId.hashCode() - val tgt = edge.tgtVertex.innerId.hashCode() - val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) - val filterHashKey = (src, tgt) + element + } recover { + case e: Exception => + logger.error(s"[toElement]: $e", e) + None + } get - (hashKey, filterHashKey) + + def toVertex(s: String): Option[Vertex] = { + toVertex(GraphUtil.split(s)) + } + + def toEdge(s: String): Option[Edge] = { + toEdge(GraphUtil.split(s)) + } + + def toEdge(parts: Array[String]): Option[Edge] = Try { + val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) + val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] + val tempDirection = if (parts.length >= 8) parts(7) else "out" + val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection + val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation) + Option(edge) + } recover { + case e: Exception => + logger.error(s"[toEdge]: $e", e) + throw e + } get + + def toVertex(parts: Array[String]): Option[Vertex] = Try { + val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) + val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] + val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation) + Option(vertex) + } recover { + case e: Throwable => + logger.error(s"[toVertex]: $e", e) + throw e + } get + + def initStorage(graph: Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = { + val storageBackend = config.getString("s2graph.storage.backend") + logger.info(s"[InitStorage]: $storageBackend") + + storageBackend match { + case "hbase" => new AsynchbaseStorage(graph, config)(ec) + case _ => throw new RuntimeException("not supported storage.") + } + } + + def parseCacheConfig(config: Config, prefix: String): Config = { + import scala.collection.JavaConversions._ + + val kvs = new java.util.HashMap[String, AnyRef]() + for { + entry <- config.entrySet() + (k, v) = (entry.getKey, entry.getValue) if k.startsWith(prefix) + } yield { + val newKey = k.replace(prefix, "") + kvs.put(newKey, v.unwrapped()) + } + ConfigFactory.parseMap(kvs) + } + + /** Global helper functions */ + @tailrec + final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { + if (range < sampleNumber || set.size == sampleNumber) set + else randomInt(sampleNumber, range, set + Random.nextInt(range)) + } + + def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { + if (edges.size <= n) { + edges + } else { + val plainEdges = if (queryRequest.queryParam.offset == 0) { + edges.tail + } else edges + + val randoms = randomInt(n, plainEdges.size) + var samples = List.empty[EdgeWithScore] + var idx = 0 + plainEdges.foreach { e => + if (randoms.contains(idx)) samples = e :: samples + idx += 1 + } + samples + } + } + + def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { + val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } + edgeWithScores.map { edgeWithScore => + edgeWithScore.copy(score = edgeWithScore.score / sum) + } } def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, Vertex), Boolean] = { @@ -100,7 +210,7 @@ object Graph { /** common methods for filter out, transform, aggregate queryResult */ def convertEdges(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = { for { - convertedEdge <- queryParam.transformer.transform(edge, nextStepOpt) if !edge.isDegree + convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree } yield convertedEdge } @@ -110,18 +220,17 @@ object Graph { case None => 1.0 case Some(timeDecay) => val tsVal = try { - val labelMeta = edge.label.metaPropsMap(timeDecay.labelMetaSeq) - val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMetaSeq) + val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMeta) innerValWithTsOpt.map { innerValWithTs => val innerVal = innerValWithTs.innerVal - labelMeta.dataType match { + timeDecay.labelMeta.dataType match { case InnerVal.LONG => innerVal.value match { case n: BigDecimal => n.bigDecimal.longValue() case _ => innerVal.toString().toLong } case _ => innerVal.toString().toLong } - } getOrElse(edge.ts) + } getOrElse (edge.ts) } catch { case e: Exception => logger.error(s"processTimeDecay error. ${edge.toLogString}", e) @@ -134,264 +243,891 @@ object Graph { tsVal } - def processDuplicates(queryParam: QueryParam, - duplicates: Seq[(FilterHashKey, EdgeWithScore)]): Seq[(FilterHashKey, EdgeWithScore)] = { + def processDuplicates[T](queryParam: QueryParam, + duplicates: Seq[(FilterHashKey, T)])(implicit ev: WithScore[T]): Seq[(FilterHashKey, T)] = { if (queryParam.label.consistencyLevel != "strong") { //TODO: queryParam.duplicatePolicy match { - case Query.DuplicatePolicy.First => Seq(duplicates.head) - case Query.DuplicatePolicy.Raw => duplicates - case Query.DuplicatePolicy.CountSum => + case DuplicatePolicy.First => Seq(duplicates.head) + case DuplicatePolicy.Raw => duplicates + case DuplicatePolicy.CountSum => val countSum = duplicates.size - Seq(duplicates.head._1 -> duplicates.head._2.copy(score = countSum)) + val (headFilterHashKey, headEdgeWithScore) = duplicates.head + Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum)) case _ => - val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + current._2.score } - Seq(duplicates.head._1 -> duplicates.head._2.copy(score = scoreSum)) + val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) } + val (headFilterHashKey, headEdgeWithScore) = duplicates.head + Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum)) } } else { duplicates } } + + def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = { + val src = edge.srcVertex.innerId.hashCode() + val tgt = edge.tgtVertex.innerId.hashCode() + val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) + val filterHashKey = (src, tgt) + + (hashKey, filterHashKey) + } + def filterEdges(q: Query, stepIdx: Int, queryRequests: Seq[QueryRequest], - queryResultLsFuture: Future[Seq[StepInnerResult]], + queryResultLsFuture: Future[Seq[StepResult]], queryParams: Seq[QueryParam], - alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean]) - (implicit ec: scala.concurrent.ExecutionContext): Future[StepInnerResult] = { + alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty, + buildLastStepInnerResult: Boolean = true, + parentEdges: Map[VertexId, Seq[EdgeWithScore]]) + (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = { queryResultLsFuture.map { queryRequestWithResultLs => - if (queryRequestWithResultLs.isEmpty) StepInnerResult.Empty + val (cursors, failCount) = { + val _cursors = ArrayBuffer.empty[Array[Byte]] + var _failCount = 0 + + queryRequestWithResultLs.foreach { stepResult => + _cursors.append(stepResult.cursors: _*) + _failCount += stepResult.failCount + } + + _cursors -> _failCount + } + + + if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount) else { + val isLastStep = stepIdx == q.steps.size - 1 + val queryOption = q.queryOption val step = q.steps(stepIdx) - val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None - - val excludeLabelWithDirSet = new util.HashSet[(Int, Int)] - val includeLabelWithDirSet = new util.HashSet[(Int, Int)] - step.queryParams.filter(_.exclude).foreach(l => excludeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir)) - step.queryParams.filter(_.include).foreach(l => includeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir)) - - val edgesToExclude = new util.HashSet[FilterHashKey]() - val edgesToInclude = new util.HashSet[FilterHashKey]() - - val sequentialLs = new ListBuffer[(HashKey, FilterHashKey, EdgeWithScore)]() - val agg = new mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, EdgeWithScore)]]() - val params = new mutable.HashMap[HashKey, QueryParam]() - var numOfDuplicates = 0 - var numOfTotal = 0 - queryRequests.zip(queryRequestWithResultLs).foreach { case (queryRequest, stepInnerResult) => - val queryParam = queryRequest.queryParam - val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0) - val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir - val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey) - val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey) - val where = queryParam.where.get - - for { - edgeWithScore <- stepInnerResult.edgesWithScoreLs - if where == WhereParser.success || where.filter(edgeWithScore.edge) - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - } { - numOfTotal += 1 - if (queryParam.transformer.isDefault) { - val convertedEdge = edge - - val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false) - - /** check if this edge should be exlcuded. */ - if (shouldBeExcluded) { - edgesToExclude.add(filterHashKey) - } else { - if (shouldBeIncluded) { - edgesToInclude.add(filterHashKey) - } - val tsVal = processTimeDecay(queryParam, convertedEdge) - val newScore = labelWeight * score * tsVal - val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore) - sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore)) - agg.get(hashKey) match { - case None => - val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]() - newLs += (filterHashKey -> newEdgeWithScore) - agg += (hashKey -> newLs) - case Some(old) => - numOfDuplicates += 1 - old += (filterHashKey -> newEdgeWithScore) - } - params += (hashKey -> queryParam) + val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs) + val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult + val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges) + + if (shouldBuildInnerResults) { + val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => + edgeWithScore + } + + /** process step group by */ + val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) + StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount) + + } else { + val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => + val edge = edgeWithScore.edge + val score = edgeWithScore.score + val label = edgeWithScore.label + + /** Select */ + val mergedPropsWithTs = + if (queryOption.selectColumns.isEmpty) { + label.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => + labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultVal) } } else { - convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge => - val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree = false) - - /** check if this edge should be exlcuded. */ - if (shouldBeExcluded) { - edgesToExclude.add(filterHashKey) - } else { - if (shouldBeIncluded) { - edgesToInclude.add(filterHashKey) - } - val tsVal = processTimeDecay(queryParam, convertedEdge) - val newScore = labelWeight * score * tsVal - val newEdgeWithScore = EdgeWithScore(convertedEdge, newScore) - sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore)) - agg.get(hashKey) match { - case None => - val newLs = new ListBuffer[(FilterHashKey, EdgeWithScore)]() - newLs += (filterHashKey -> newEdgeWithScore) - agg += (hashKey -> newLs) - case Some(old) => - numOfDuplicates += 1 - old += (filterHashKey -> newEdgeWithScore) - } - params += (hashKey -> queryParam) - } + val initial = Map(LabelMeta.timestamp -> edge.propsWithTs(LabelMeta.timestamp)) + propsSelectColumns.foldLeft(initial) { case (prev, labelMeta) => + prev + (labelMeta -> edge.propsWithTs.getOrElse(labelMeta, label.metaPropsDefaultMapInner(labelMeta))) } } - } - } + val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) + val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) + /** OrderBy */ + val orderByValues = + if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None) + else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) + + /** StepGroupBy */ + val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys) + + /** GroupBy */ + val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys) + + /** FilterOut */ + val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields) - val edgeWithScoreLs = new ListBuffer[EdgeWithScore]() - if (numOfDuplicates == 0) { - // no duplicates at all. - for { - (hashKey, filterHashKey, edgeWithScore) <- sequentialLs - if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) - } { - edgeWithScoreLs += edgeWithScore + newEdgeWithScore.copy(orderByValues = orderByValues, + stepGroupByValues = stepGroupByValues, + groupByValues = groupByValues, + filterOutValues = filterOutValues) } - } else { - // need to resolve duplicates. - val seen = new mutable.HashSet[HashKey]() - for { - (hashKey, filterHashKey, edgeWithScore) <- sequentialLs - if !seen.contains(hashKey) - if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) - } { - val queryParam = params(hashKey) - val duplicates = processDuplicates(queryParam, agg(hashKey)) - duplicates.foreach { case (_, duplicate) => - if (duplicate.score >= queryParam.threshold) { - seen += hashKey - edgeWithScoreLs += duplicate + + /** process step group by */ + val results = StepResult.filterOutStepGroupBy(_results, step.groupBy) + + /** process ordered list */ + val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil + + /** process grouped list */ + val grouped = + if (queryOption.groupBy.keys.isEmpty) Nil + else { + val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]() + results.groupBy { edgeWithScore => + // edgeWithScore.groupByValues.map(_.map(_.toString)) + edgeWithScore.groupByValues + }.foreach { case (k, ls) => + val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption) + + val newScoreSum = scoreSum + + /** + * watch out here. by calling toString on Any, we lose type information which will be used + * later for toJson. + */ + if (merged.nonEmpty) { + val newKey = merged.head.groupByValues + agg += (newKey -> (newScoreSum, merged)) } } + agg.toSeq.sortBy(_._2._1 * -1) } - } - val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges) - StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, degreeEdges = degrees) + StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount) + } } } } - def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { - val parts = GraphUtil.split(s) - val logType = parts(2) - val element = if (logType == "edge" | logType == "e") { - /* current only edge is considered to be bulk loaded */ - labelMapping.get(parts(5)) match { - case None => - case Some(toReplace) => - parts(5) = toReplace + private def toEdgeWithScores(queryRequest: QueryRequest, + stepResult: StepResult, + parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam + val prevScore = queryRequest.prevStepScore + val labelWeight = queryRequest.labelWeight + val edgeWithScores = stepResult.edgeWithScores + + val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent + val parents = if (shouldBuildParents) { + parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore => + val edge = edgeWithScore.edge + val score = edgeWithScore.score + val label = edgeWithScore.label + + /** Select */ + val mergedPropsWithTs = + if (queryOption.selectColumns.isEmpty) { + label.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => + labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultVal) + } + } else { + val initial = Map(LabelMeta.timestamp -> edge.propsWithTs(LabelMeta.timestamp)) + queryOption.selectColumns.foldLeft(initial) { case (acc, labelMetaName) => + label.metaPropsDefaultMapInnerString.get(labelMetaName) match { + case None => acc + case Some(defaultValue) => + val labelMeta = label.metaPropsInvMap(labelMetaName) + acc + (labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultValue)) + } + } + } + + val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) + edgeWithScore.copy(edge = newEdge) } - toEdge(parts) - } else if (logType == "vertex" | logType == "v") { - toVertex(parts) - } else { - throw new GraphExceptions.JsonParseException("log type is not exist in log.") - } + } else Nil - element - } recover { - case e: Exception => - logger.error(s"[toElement]: $e", e) - None - } get + // skip + if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores + else { + val degreeScore = 0.0 + val sampled = + if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) + else edgeWithScores - def toVertex(s: String): Option[Vertex] = { - toVertex(GraphUtil.split(s)) - } + val withScores = for { + edgeWithScore <- sampled + } yield { + val edge = edgeWithScore.edge + val edgeScore = edgeWithScore.score + val score = queryParam.scorePropagateOp match { + case "plus" => edgeScore + prevScore + case "divide" => + if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 + else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) + case _ => edgeScore * prevScore + } - def toEdge(s: String): Option[Edge] = { - toEdge(GraphUtil.split(s)) + val tsVal = processTimeDecay(queryParam, edge) + val newScore = degreeScore + score + // val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge + val newEdge = edge.copy(parentEdges = parents) + edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal) + } + + val normalized = + if (queryParam.shouldNormalize) normalize(withScores) + else withScores + + normalized + } } - def toEdge(parts: Array[String]): Option[Edge] = Try { - val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] - val tempDirection = if (parts.length >= 8) parts(7) else "out" - val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection - val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation) - Option(edge) - } recover { - case e: Exception => - logger.error(s"[toEdge]: $e", e) - throw e - } get + private def buildResult[T](query: Query, + stepIdx: Int, + stepResultLs: Seq[(QueryRequest, StepResult)], + parentEdges: Map[VertexId, Seq[EdgeWithScore]]) + (createFunc: (EdgeWithScore, Set[LabelMeta]) => T) + (implicit ev: WithScore[T]): ListBuffer[T] = { + import scala.collection._ - def toVertex(parts: Array[String]): Option[Vertex] = Try { - val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] - val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation) - Option(vertex) - } recover { - case e: Throwable => - logger.error(s"[toVertex]: $e", e) - throw e - } get + val results = ListBuffer.empty[T] + val sequentialLs: ListBuffer[(HashKey, FilterHashKey, T, QueryParam)] = ListBuffer.empty + val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, T)]] = mutable.HashMap.empty + val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty + val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty - def initStorage(graph: Graph, config: Config)(ec: ExecutionContext) = { - config.getString("s2graph.storage.backend") match { - case "hbase" => new AsynchbaseStorage(graph, config)(ec) - case _ => throw new RuntimeException("not supported storage.") + var numOfDuplicates = 0 + val queryOption = query.queryOption + val step = query.steps(stepIdx) + val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet + val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet + + stepResultLs.foreach { case (queryRequest, stepInnerResult) => + val queryParam = queryRequest.queryParam + val label = queryParam.label + val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir) + val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir) + + val propsSelectColumns = (for { + column <- queryOption.propsSelectColumns + labelMeta <- label.metaPropsInvMap.get(column) + } yield labelMeta).toSet + + for { + edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges) + } { + val edge = edgeWithScore.edge + val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false) + // params += (hashKey -> queryParam) // + + /** check if this edge should be exlcuded. */ + if (shouldBeExcluded) { + edgesToExclude.add(filterHashKey) + } else { + if (shouldBeIncluded) { + edgesToInclude.add(filterHashKey) + } + val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns) + + sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam)) + duplicates.get(hashKey) match { + case None => + val newLs = ListBuffer.empty[(FilterHashKey, T)] + newLs += (filterHashKey -> newEdgeWithScore) + duplicates += (hashKey -> newLs) // + case Some(old) => + numOfDuplicates += 1 + old += (filterHashKey -> newEdgeWithScore) // + } + } + } } + + + if (numOfDuplicates == 0) { + // no duplicates at all. + for { + (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs + if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + } { + results += edgeWithScore + } + } else { + // need to resolve duplicates. + val seen = new mutable.HashSet[HashKey]() + for { + (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs + if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + if !seen.contains(hashKey) + } { + // val queryParam = params(hashKey) + processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) => + if (ev.score(duplicate) >= queryParam.threshold) { + seen += hashKey + results += duplicate + } + } + } + } + results } + } class Graph(_config: Config)(implicit val ec: ExecutionContext) { + + import Graph._ + val config = _config.withFallback(Graph.DefaultConfig) Model.apply(config) Model.loadCache() - // TODO: Make storage client by config param - val storage = Graph.initStorage(this, config)(ec) + val MaxRetryNum = config.getInt("max.retry.number") + val MaxBackOff = config.getInt("max.back.off") + val BackoffTimeout = config.getInt("back.off.timeout") + val DeleteAllFetchCount = config.getInt("delete.all.fetch.count") + val DeleteAllFetchSize = config.getInt("delete.all.fetch.size") + val FailProb = config.getDouble("hbase.fail.prob") + val LockExpireDuration = config.getInt("lock.expire.time") + val MaxSize = config.getInt("future.cache.max.size") + val ExpireAfterWrite = config.getInt("future.cache.expire.after.write") + val ExpireAfterAccess = config.getInt("future.cache.expire.after.access") + val scheduledEx = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) + + private def confWithFallback(conf: Config): Config = { + conf.withFallback(config) + } + + /** + * TODO: we need to some way to handle malformed configuration for storage. + */ + val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = { + val labels = Label.findAll() + val services = Service.findAll() + + val labelConfigs = labels.flatMap(_.toStorageConfig) + val serviceConfigs = services.flatMap(_.toStorageConfig) + + val configs = (labelConfigs ++ serviceConfigs).map { conf => + confWithFallback(conf) + }.toSet + + val pools = new java.util.HashMap[Config, Storage[_, _]]() + configs.foreach { config => + pools.put(config, Graph.initStorage(this, config)(ec)) + } + + val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]() + + labels.foreach { label => + if (label.storageConfigOpt.isDefined) { + m += (s"label:${label.label}" -> pools(label.storageConfigOpt.get)) + } + } + + services.foreach { service => + if (service.storageConfigOpt.isDefined) { + m += (s"service:${service.serviceName}" -> pools(service.storageConfigOpt.get)) + } + } + + m + } + + val defaultStorage: Storage[_, _] = Graph.initStorage(this, config)(ec) + + /** QueryLevel FutureCache */ + val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty) for { entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey) (k, v) = (entry.getKey, entry.getValue) } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}") - /** select */ - def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = storage.checkEdges(params) + def getStorage(service: Service): Storage[_, _] = { + storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage) + } + + def getStorage(label: Label): Storage[_, _] = { + storagePool.getOrElse(s"label:${label.label}", defaultStorage) + } + + def flushStorage(): Unit = { + storagePool.foreach { case (_, storage) => + + /** flush is blocking */ + storage.flush() + } + } + + def fallback = Future.successful(StepResult.Empty) + + def checkEdges(edges: Seq[Edge]): Future[StepResult] = { + val futures = for { + edge <- edges + } yield { + fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) => + edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label)) + } + } + + Future.sequence(futures).map { edgeWithScoreLs => + val edgeWithScores = edgeWithScoreLs.flatten + StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = Nil) + } + } + + // def checkEdges(edges: Seq[Edge]): Future[StepResult] = storage.checkEdges(edges) + + def getEdges(q: Query): Future[StepResult] = { + Try { + if (q.steps.isEmpty) { + // TODO: this should be get vertex query. + fallback + } else { + val filterOutFuture = q.queryOption.filterOutQuery match { + case None => fallback + case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) + } + for { + stepResult <- getEdgesStepInner(q) + filterOutInnerResult <- filterOutFuture + } yield { + if (filterOutInnerResult.isEmpty) stepResult + else StepResult.filterOut(this, q.queryOption, stepResult, filterOutInnerResult) + } + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + fallback + } get + } + + def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = { + Try { + if (q.steps.isEmpty) fallback + else { + + val queryOption = q.queryOption + def fetch: Future[StepResult] = { + val startStepInnerResult = QueryResult.fromVertices(this, q) + q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) => + for { + prevStepInnerResult <- prevStepInnerResultFuture + currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult) + } yield { + currentStepInnerResult.copy( + accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors, + failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount + ) + } + } + } + + fetch + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + fallback + } get + } + + def fetchStep(orgQuery: Query, + stepIdx: Int, + stepInnerResult: StepResult, + buildLastStepInnerResult: Boolean = false): Future[StepResult] = { + if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty) + else { + val edgeWithScoreLs = stepInnerResult.edgeWithScores + + val q = orgQuery + val queryOption = orgQuery.queryOption + val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None + val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold) + val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1) + val step = q.steps(stepIdx) + + val alreadyVisited = + if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean] + else alreadyVisitedVertices(stepInnerResult.edgeWithScores) + + val initial = (Map.empty[Vertex, Double], Map.empty[Vertex, ArrayBuffer[EdgeWithScore]]) + val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) => + val key = edgeWithScore.edge.tgtVertex + val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score + val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore]) + buffer += edgeWithScore + (sum + (key -> newScore), group + (key -> buffer)) + } + val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold) + val prevStepTgtVertexIdEdges = grouped.map(t => t._1.id -> t._2) + + val nextStepSrcVertices = if (prevStepLimit >= 0) { + groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit) + } else { + groupedByFiltered.toSeq + } + + val queryRequests = for { + (vertex, prevStepScore) <- nextStepSrcVertices + queryParam <- step.queryParams + } yield { + val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0) + val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0 + QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight) + } + + val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) + + filterEdges(orgQuery, stepIdx, queryRequests, + fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec) + } + } + + + /** + * responsible to fire parallel fetch call into storage and create future that will return merged result. + * + * @param queryRequests + * @param prevStepEdges + * @return + */ + def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = { + + val reqWithIdxs = queryRequests.zipWithIndex + val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label) + val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) => + for { + prev <- prevFuture + cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) + } yield { + prev ++ reqWithIdxs.map(_._2).zip(cur).toMap + } + } + aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) } + } + + + def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = { + Try { + if (mq.queries.isEmpty) fallback + else { + val filterOutFuture = mq.queryOption.filterOutQuery match { + case None => fallback + case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) + } + + val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) }) + for { + multiQueryResults <- multiQueryFutures + filterOutInnerResult <- filterOutFuture + } yield { + StepResult.merges(mq.queryOption, multiQueryResults, mq.weights, filterOutInnerResult) + } + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + fallback + } get + } + + + def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, Option[Edge], Option[SKeyValue])] = { + /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache + * so use empty cacheKey. + * */ + val queryParam = QueryParam(labelName = edge.label.label, + direction = GraphUtil.fromDirection(edge.labelWithDir.dir), + tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), + cacheTTLInMillis = -1) + val q = Query.toQuery(Seq(edge.srcVertex), queryParam) + val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) + + val storage = getStorage(edge.label) + storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs => + val (edgeOpt, kvOpt) = + if (kvs.isEmpty) (None, None) + else { + val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil) + val _kvOpt = kvs.headOption + (snapshotEdgeOpt, _kvOpt) + } + (queryParam, edgeOpt, kvOpt) + } recoverWith { case ex: Throwable => + logger.error(s"fetchQueryParam failed. fallback return.", ex) + throw FetchTimeoutException(s"${edge.toLogString}") + } + } + + def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { + val verticesWithIdx = vertices.zipWithIndex + val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => + getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2))) + } + + Future.sequence(futures).map { ls => + ls.flatten.toSeq.sortBy(_._2).map(_._1) + } + } + + /** mutate */ + def deleteAllAdjacentEdges(srcVertices: Seq[Vertex], + labels: Seq[Label], + dir: Int, + ts: Long): Future[Boolean] = { + + val requestTs = ts + val vertices = srcVertices + /** create query per label */ + val queries = for { + label <- labels + } yield { + val queryParam = QueryParam(labelName = label.label, direction = GraphUtil.fromDirection(dir), + offset = 0, limit = DeleteAllFetchSize, duplicatePolicy = DuplicatePolicy.Raw) + val step = Step(List(queryParam)) + Query(vertices, Vector(step)) + } + + // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) { + val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) { + fetchAndDeleteAll(queries, requestTs) + } { case (allDeleted, deleteSuccess) => + allDeleted && deleteSuccess + }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } + + retryFuture onFailure { + case ex => + logger.error(s"[Error]: deleteAllAdjacentEdges failed.") + } - def getEdges(q: Query): Future[StepResult] = storage.getEdges(q) + retryFuture + } - def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = storage.getEdgesMultiQuery(mq) + def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = { + val future = for { + stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true))) + (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs) + } yield { + // logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}") + (allDeleted, ret) + } - def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices) + Extensions.retryOnFailure(MaxRetryNum) { + future + } { + logger.error(s"fetch and deleteAll failed.") + (true, false) + } - /** write */ - def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] = - storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts) + } - def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[Boolean]] = - storage.mutateElements(elements, withWait) + def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult], + requestTs: Long): Future[(Boolean, Boolean)] = { + stepInnerResultLs.foreach { stepInnerResult => + if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.") + } + val futures = for { + stepInnerResult <- stepInnerResultLs + deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs) + if deleteStepInnerResult.edgeWithScores.nonEmpty + } yield { + val head = deleteStepInnerResult.edgeWithScores.head + val label = head.edge.label + val ret = label.schemaVersion match { + case HBaseType.VERSION3 | HBaseType.VERSION4 => + if (label.consistencyLevel == "strong") { + /** + * read: snapshotEdge on queryResult = O(N) + * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) + */ + mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity)) + } else { + getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) + } + case _ => - def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait) + /** + * read: x + * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) + */ + getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) + } + ret + } - def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateVertices(vertices, withWait) + if (futures.isEmpty) { + // all deleted. + Future.successful(true -> true) + } else { + Future.sequence(futures).map { rets => false -> rets.forall(identity) } + } + } - def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges, withWait) + def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = { + val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore => + (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree + } + if (filtered.isEmpty) StepResult.Empty + else { + val head = filtered.head + val label = head.edge.label + val edgeWithScoreLs = filtered.map { edgeWithScore => + val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { + case "strong" => + val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++ + Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion)) + (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) + case _ => + val oldEdge = edgeWithScore.edge + (oldEdge.op, oldEdge.version, oldEdge.propsWithTs) + } + + val copiedEdge = + edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) + + val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) + // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") + edgeToDelete + } + //Degree edge? + StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false) + } + } + + // def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] = + // storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts) + + def mutateElements(elements: Seq[GraphElement], + withWait: Boolean = false): Future[Seq[Boolean]] = { + + val edgeBuffer = ArrayBuffer[(Edge, Int)]() + val vertexBuffer = ArrayBuffer[(Vertex, Int)]() + + elements.zipWithIndex.foreach { + case (e: Edge, idx: Int) => edgeBuffer.append((e, idx)) + case (v: Vertex, idx: Int) => vertexBuffer.append((v, idx)) + case any@_ => logger.error(s"Unknown type: ${any}") + } + + val edgeFutureWithIdx = mutateEdges(edgeBuffer.map(_._1), withWait).map { result => + edgeBuffer.map(_._2).zip(result) + } + + val vertexFutureWithIdx = mutateVertices(vertexBuffer.map(_._1), withWait).map { result => + vertexBuffer.map(_._2).zip(result) + } + + val graphFuture = for { + edgesMutated <- edgeFutureWithIdx + verticesMutated <- vertexFutureWithIdx + } yield (edgesMutated ++ verticesMutated).sortBy(_._1).map(_._2) + + graphFuture + + } + + // def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait) + + def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = { + val edgeWithIdxs = edges.zipWithIndex + + val (strongEdges, weakEdges) = + edgeWithIdxs.partition { case (edge, idx) => + val e = edge + e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk") + } + + val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.label.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => + val futures = edgeWithIdxs.groupBy(_._1.label).map { case (label, edgeGroup) => + val storage = getStorage(label) + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + + /** multiple edges with weak consistency level will be processed as batch */ + val mutations = edges.flatMap { edge => + val (_, edgeUpdate) = + if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge) + else Edge.buildOperation(None, Seq(edge)) + + storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate) ++ storage.snapshotEdgeMutations(edgeUpdate) ++ storage.increments(edgeUpdate) + } + + storage.writeToStorage(zkQuorum, mutations, withWait).map { ret => + idxs.map(idx => idx -> ret) + } + } + Future.sequence(futures) + } + val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") } + + val deleteAllFutures = strongDeleteAll.map { case (edge, idx) => + deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts).map(idx -> _) + } + + val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.label }.map { case (label, edgeGroup) => + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + val storage = getStorage(label) + storage.mutateStrongEdges(edges, withWait = true).map { rets => + idxs.zip(rets) + } + } + + for { + weak <- Future.sequence(weakEdgesFutures) + deleteAll <- Future.sequence(deleteAllFutures) + strong <- Future.sequence(strongEdgesFutures) + } yield { + (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2) + } + } + + def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = { + val verticesWithIdx = vertices.zipWithIndex + val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => + getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) + } + Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } + } + + def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = { + val edgesWithIdx = edges.zipWithIndex + val futures = edgesWithIdx.groupBy { case (e, idx) => e.label }.map { case (label, edgeGroup) => + getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) + } + Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } + } + + def updateDegree(edge: Edge, degreeVal: Long = 0): Future[Boolean] = { + val label = edge.label + + val storage = getStorage(label) + val kvs = storage.buildDegreePuts(edge, degreeVal) + + storage.writeToStorage(edge.label.service.cluster, kvs, withWait = true) + } def shutdown(): Unit = { - storage.flush() + flushStorage() Model.shutdown() } + + def addEdge(srcId: Any, + tgtId: Any, + labelName: String, + direction: String = "out", + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert", + withWait: Boolean = true): Future[Boolean] = { + + val innerEdges = Seq(Edge.toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation)) + mutateEdges(innerEdges, withWait).map(_.headOption.getOrElse(false)) + } + + def addVertex(serviceName: String, + columnName: String, + id: Any, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert", + withWait: Boolean = true): Future[Boolean] = { + val innerVertices = Seq(Vertex.toVertex(serviceName, columnName, id, props.toMap, ts, operation)) + mutateVertices(innerVertices, withWait).map(_.headOption.getOrElse(false)) + } }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala index 2f090cf..2cc1063 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala @@ -20,6 +20,24 @@ package org.apache.s2graph.core object GraphExceptions { + var fillStckTrace = true + class BaseException(msg : String) extends Exception(msg){ + override def fillInStackTrace : Exception = { + if(fillStckTrace) super.fillInStackTrace() + this + } + } + class NoStackException(msg : String) extends Exception(msg){ + override def fillInStackTrace : Exception = { + this + } + } + + class NoStackCauseException(msg : String, ex: Throwable ) extends Exception(msg, ex){ + override def fillInStackTrace : Exception = { + this + } + } case class JsonParseException(msg: String) extends Exception(msg) @@ -43,7 +61,11 @@ object GraphExceptions { case class InvalidHTableException(msg: String) extends Exception(msg) - case class FetchTimeoutException(msg: String) extends Exception(msg) + case class FetchTimeoutException(msg: String) extends NoStackException(msg) case class DropRequestException(msg: String) extends Exception(msg) + + case class FetchAllStepFailException(msg: String) extends Exception(msg) + + case class AccessDeniedException(amsg: String) extends Exception(amsg) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala index ebfee7a..939b596 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala @@ -50,8 +50,8 @@ object GraphUtil { val d = direction.trim().toLowerCase match { case "directed" | "d" => Some(0) case "undirected" | "u" => Some(2) - case "out" => Some(0) - case "in" => Some(1) + case "out" | "o" => Some(0) + case "in" | "i" => Some(1) case _ => None } d.map(x => x.toByte) @@ -61,8 +61,8 @@ object GraphUtil { direction.trim().toLowerCase match { case "directed" | "d" => 0 case "undirected" | "u" => 2 - case "out" => 0 - case "in" => 1 + case "out" | "o" => 0 + case "in" | "i" => 1 case _ => 2 } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 9ef7c14..89acc63 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -25,6 +25,7 @@ import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.types.HBaseType._ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.utils.logger import play.api.libs.json.Reads._ import play.api.libs.json._ @@ -104,8 +105,8 @@ object Management { } } - def findLabel(labelName: String): Option[Label] = { - Label.findByName(labelName, useCache = false) + def findLabel(labelName: String, useCache: Boolean = false): Option[Label] = { + Label.findByName(labelName, useCache = useCache) } def deleteLabel(labelName: String) = { @@ -117,6 +118,16 @@ object Management { } } + def markDeletedLabel(labelName: String) = { + Model withTx { implicit session => + Label.findByName(labelName, useCache = false).foreach { label => + // rename & delete_at column filled with current time + Label.markDeleted(label) + } + labelName + } + } + def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = { Model withTx { implicit session => val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found")) @@ -205,12 +216,12 @@ object Management { } - def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(Byte, InnerValLike)] = { + def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(LabelMeta, InnerValLike)] = { val props = for { (k, v) <- js meta <- label.metaPropsInvMap.get(k) innerVal <- jsValueToInnerVal(v, meta.dataType, label.schemaVersion) - } yield (meta.seq, innerVal) + } yield (meta, innerVal) props } @@ -248,15 +259,18 @@ object Management { class Management(graph: Graph) { import Management._ - val storage = graph.storage - def createTable(zkAddr: String, + def createStorageTable(zkAddr: String, tableName: String, cfs: List[String], regionMultiplier: Int, ttl: Option[Int], - compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit = - storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm) + compressionAlgorithm: String = DefaultCompressionAlgorithm, + replicationScopeOpt: Option[Int] = None, + totalRegionCount: Option[Int] = None): Unit = { + graph.defaultStorage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm, replicationScopeOpt, totalRegionCount) + } + /** HBase specific code */ def createService(serviceName: String, @@ -265,9 +279,9 @@ class Management(graph: Graph) { compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = { Model withTx { implicit session => - val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) - /* create hbase table for service */ - storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm) + val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm) + /** create hbase table for service */ + graph.getStorage(service).createTable(service.cluster, service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm) service } } @@ -292,35 +306,26 @@ class Management(graph: Graph) { compressionAlgorithm: String = "gz", options: Option[String]): Try[Label] = { - val labelOpt = Label.findByName(label, useCache = false) + if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : ${LABEL_NAME_MAX_LENGTH}} )") + if (hTableName.isEmpty && hTableTTL.isDefined) throw new RuntimeException("if want to specify ttl, give hbaseTableName also") + val labelOpt = Label.findByName(label, useCache = false) Model withTx { implicit session => - labelOpt match { - case Some(l) => - throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.") - case None => - /** create all models */ - if (label.length > LABEL_NAME_MAX_LENGTH ) throw new LabelNameTooLongException(s"Label name ${label} too long.( max length : 40 )") - val newLabel = Label.insertAll(label, - srcServiceName, srcColumnName, srcColumnType, - tgtServiceName, tgtColumnName, tgtColumnType, - isDirected, serviceName, indices, props, consistencyLevel, - hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options) - - /* create hbase table */ - val service = newLabel.service - (hTableName, hTableTTL) match { - case (None, None) => // do nothing - case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also") - case (Some(hbaseTableName), None) => - // create own hbase table with default ttl on service level. - storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm) - case (Some(hbaseTableName), Some(hbaseTableTTL)) => - // create own hbase table with own ttl. - storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm) - } - newLabel - } + if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.") + + /** create all models */ + val newLabel = Label.insertAll(label, + srcServiceName, srcColumnName, srcColumnType, + tgtServiceName, tgtColumnName, tgtColumnType, + isDirected, serviceName, indices, props, consistencyLevel, + hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm, options) + + /** create hbase table */ + val storage = graph.getStorage(newLabel) + val service = newLabel.service + storage.createTable(service.cluster, newLabel.hbaseTableName, List("e", "v"), service.preSplitSize, newLabel.hTableTTL, newLabel.compressionAlgorithm) + + newLabel } } @@ -331,12 +336,11 @@ class Management(graph: Graph) { * copy label when if oldLabel exist and newLabel do not exist. * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster. */ - def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = { + def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]): Try[Label] = { val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw new LabelNotExistException(s"Old label $oldLabelName not exists.")) - if (Label.findByName(newLabelName, useCache = false).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.") - val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) } - val allIndices = old.indices.map { index => Index(index.name, index.propNames) } + val allProps = old.metas(useCache = false).map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) } + val allIndices = old.indices(useCache = false).map { index => Index(index.name, index.propNames) } createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType, old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType, @@ -344,4 +348,13 @@ class Management(graph: Graph) { allIndices, allProps, old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm, old.options) } + + def getCurrentStorageInfo(labelName: String): Try[Map[String, String]] = for { + label <- Try(Label.findByName(labelName, useCache = false).get) + } yield { + val storage = graph.getStorage(label) + storage.info + } + } + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala index 7cc2420..6c8563c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala @@ -19,12 +19,18 @@ package org.apache.s2graph.core -import org.apache.s2graph.core.GraphExceptions.BadQueryException +import java.util.Base64 + +import com.google.protobuf.ByteString +import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException} import org.apache.s2graph.core.mysqls._ -import org.apache.s2graph.core.types.{InnerVal, InnerValLike} +import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs} import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.rest.RequestParser +import org.apache.s2graph.core.utils.logger import play.api.libs.json.{Json, _} +import scala.collection.JavaConversions._ import scala.collection.immutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -40,74 +46,100 @@ object PostProcess { * Result Entity score field name */ val emptyDegrees = Seq.empty[JsValue] - val timeoutResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isTimeout" -> true) - val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isEmpty" -> true) + val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isEmpty" -> true, "rpcFail" -> 0) + def badRequestResults(ex: => Exception) = ex match { case ex: BadQueryException => Json.obj("message" -> ex.msg) case _ => Json.obj("message" -> ex.getMessage) } - val SCORE_FIELD_NAME = "scoreSum" - val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props") - - def s2EdgeParent(graph: Graph, + queryOption: QueryOption, parentEdges: Seq[EdgeWithScore]): JsValue = { if (parentEdges.isEmpty) JsNull else { val ancestors = for { current <- parentEdges - parents = s2EdgeParent(graph, current.edge.parentEdges) if parents != JsNull + parents = s2EdgeParent(graph, queryOption, current.edge.parentEdges) if parents != JsNull } yield { val s2Edge = current.edge.originalEdgeOpt.getOrElse(current.edge) - s2EdgeToJsValue(s2Edge, current.score, false, parents = parents) + s2EdgeToJsValue(queryOption, current.copy(edge = s2Edge), false, parents = parents, checkSelectColumns = true) } Json.toJson(ancestors) } } - def s2EdgeToJsValue(s2Edge: Edge, - score: Double, + def s2EdgeToJsValue(queryOption: QueryOption, + edgeWithScore: EdgeWithScore, isDegree: Boolean = false, - parents: JsValue = JsNull): JsValue = { + parents: JsValue = JsNull, + checkSelectColumns: Boolean = false): JsValue = { + // val builder = immutable.Map.newBuilder[String, JsValue] + val builder = ArrayBuffer.empty[(String, JsValue)] + val s2Edge = edgeWithScore.edge + val score = edgeWithScore.score + val label = edgeWithScore.label if (isDegree) { - Json.obj( - "from" -> anyValToJsValue(s2Edge.srcId), - "label" -> s2Edge.labelName, - LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degreeSeq).innerVal.value) - ) + builder += ("from" -> anyValToJsValue(s2Edge.srcId).get) + builder += ("label" -> anyValToJsValue(label.label).get) + builder += ("direction" -> anyValToJsValue(s2Edge.direction).get) + builder += (LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degree).innerVal.value).get) + JsObject(builder) } else { - Json.obj("from" -> anyValToJsValue(s2Edge.srcId), - "to" -> anyValToJsValue(s2Edge.tgtId), - "label" -> s2Edge.labelName, - "score" -> score, - "props" -> JSONParser.propertiesToJson(s2Edge.properties), - "direction" -> s2Edge.direction, - "timestamp" -> anyValToJsValue(s2Edge.ts), - "parents" -> parents - ) - } - } + if (queryOption.withScore) builder += ("score" -> anyValToJsValue(score).get) - def withImpressionId(queryOption: QueryOption, - size: Int, - degrees: Seq[JsValue], - results: Seq[JsValue]): JsValue = { - queryOption.impIdOpt match { - case None => Json.obj( - "size" -> size, - "degrees" -> degrees, - "results" -> results - ) - case Some(impId) => - Json.obj( - "size" -> size, - "degrees" -> degrees, - "results" -> results, - Experiment.ImpressionKey -> impId - ) + if (queryOption.selectColumns.isEmpty) { + builder += ("from" -> anyValToJsValue(s2Edge.srcId).get) + builder += ("to" -> anyValToJsValue(s2Edge.tgtId).get) + builder += ("label" -> anyValToJsValue(label.label).get) + + val innerProps = ArrayBuffer.empty[(String, JsValue)] + for { + (labelMeta, v) <- edgeWithScore.edge.propsWithTs + jsValue <- anyValToJsValue(v.innerVal.value) + } { + innerProps += (labelMeta.name -> jsValue) + } + + + builder += ("props" -> JsObject(innerProps)) + builder += ("direction" -> anyValToJsValue(s2Edge.direction).get) + builder += ("timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) + builder += ("_timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) // backward compatibility + if (parents != JsNull) builder += ("parents" -> parents) + // Json.toJson(builder.result()) + JsObject(builder) + } else { + queryOption.selectColumnsMap.foreach { case (columnName, _) => + columnName match { + case "from" => builder += ("from" -> anyValToJsValue(s2Edge.srcId).get) + case "_from" => builder += ("_from" -> anyValToJsValue(s2Edge.srcId).get) + case "to" => builder += ("to" -> anyValToJsValue(s2Edge.tgtId).get) + case "_to" => builder += ("_to" -> anyValToJsValue(s2Edge.tgtId).get) + case "label" => builder += ("label" -> anyValToJsValue(label.label).get) + case "direction" => builder += ("direction" -> anyValToJsValue(s2Edge.direction).get) + case "timestamp" => builder += ("timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) + case "_timestamp" => builder += ("_timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) + case _ => // should not happen + + } + } + val innerProps = ArrayBuffer.empty[(String, JsValue)] + for { + (labelMeta, v) <- edgeWithScore.edge.propsWithTs + if !checkSelectColumns || queryOption.selectColumnsMap.contains(labelMeta.name) + jsValue <- anyValToJsValue(v.innerVal.value) + } { + innerProps += (labelMeta.name -> jsValue) + } + + builder += ("props" -> JsObject(innerProps)) + if (parents != JsNull) builder += ("parents" -> parents) + JsObject(builder) + } } } + def s2VertexToJson(s2Vertex: Vertex): Option[JsValue] = { val props = for { (k, v) <- s2Vertex.properties @@ -140,6 +172,7 @@ object PostProcess { val kvs = new ArrayBuffer[(String, JsValue)]() + kvs.append("size" -> JsNumber(size)) kvs.append("degrees" -> JsArray(degrees)) kvs.append("results" -> JsArray(results)) @@ -149,28 +182,71 @@ object PostProcess { JsObject(kvs) } - def toJson(graph: Graph, - queryOption: QueryOption, - stepResult: StepResult): JsValue = { + def buildJsonWith(js: JsValue)(implicit fn: (String, JsValue) => JsValue): JsValue = js match { + case JsObject(obj) => JsObject(obj.map { case (k, v) => k -> buildJsonWith(fn(k, v)) }) + case JsArray(arr) => JsArray(arr.map(buildJsonWith(_))) + case _ => js + } + + def toJson(orgQuery: Option[JsValue])(graph: Graph, + queryOption: QueryOption, + stepResult: StepResult): JsValue = { + + // [[cursor, cursor], [cursor]] + lazy val cursors: Seq[Seq[String]] = stepResult.accumulatedCursors.map { stepCursors => + stepCursors.map { cursor => new String(Base64.getEncoder.encode(cursor)) } + } + + lazy val cursorJson: JsValue = Json.toJson(cursors) + + // build nextQuery with (original query + cursors) + lazy val nextQuery: Option[JsValue] = { + if (cursors.exists { stepCursors => stepCursors.exists(_ != "") }) { + val cursorIter = cursors.iterator + orgQuery.map { query => + buildJsonWith(query) { (key, js) => + if (key == "step") { + val currentCursor = cursorIter.next + val res = js.as[Seq[JsObject]].toStream.zip(currentCursor).filterNot(_._2 == "").map { case (obj, cursor) => + val label = (obj \ "label").as[String] + if (Label.findByName(label).get.schemaVersion == "v4") obj + ("cursor" -> JsString(cursor)) + else { + val limit = (obj \ "limit").asOpt[Int].getOrElse(RequestParser.defaultLimit) + val offset = (obj \ "offset").asOpt[Int].getOrElse(0) + obj + ("offset" -> JsNumber(offset + limit)) + } + } + JsArray(res) + } else js + } + } + } else Option(JsNull) + } + + val limitOpt = queryOption.limitOpt + val selectColumns = queryOption.selectColumnsMap val degrees = - if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(t.s2Edge, t.score, true)) + if (queryOption.returnDegree) stepResult.degreeEdges.map(t => s2EdgeToJsValue(queryOption, t, true, JsNull)) else emptyDegrees if (queryOption.groupBy.keys.isEmpty) { // no group by specified on query. + val results = if (limitOpt.isDefined) stepResult.edgeWithScores.take(limitOpt.get) else stepResult.edgeWithScores + val ls = results.map { t => + val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.parentEdges) else JsNull - val ls = stepResult.results.map { t => - val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull - s2EdgeToJsValue(t.s2Edge, t.score, false, parents) + s2EdgeToJsValue(queryOption, t, false, parents) } - withImpressionId(queryOption, ls.size, degrees, ls) + + withOptionalFields(queryOption, ls.size, degrees, ls, stepResult.failCount, cursorJson, nextQuery) } else { + val grouped = if (limitOpt.isDefined) stepResult.grouped.take(limitOpt.get) else stepResult.grouped val results = for { - (groupByValues, (scoreSum, edges)) <- stepResult.grouped + (groupByValues, (scoreSum, edges)) <- grouped } yield { val groupByKeyValues = queryOption.groupBy.keys.zip(groupByValues).map { case (k, valueOpt) => k -> valueOpt.flatMap(anyValToJsValue).getOrElse(JsNull) @@ -185,8 +261,8 @@ object PostProcess { ) } else { val agg = edges.map { t => - val parents = if (queryOption.returnTree) s2EdgeParent(graph, t.parentEdges) else JsNull - s2EdgeToJsValue(t.s2Edge, t.score, false, parents) + val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.parentEdges) else JsNull + s2EdgeToJsValue(queryOption, t, false, parents) } val aggJson = Json.toJson(agg) Json.obj( @@ -196,7 +272,8 @@ object PostProcess { ) } } - withImpressionId(queryOption, results.size, degrees, results) + + withOptionalFields(queryOption, results.size, degrees, results, stepResult.failCount, cursorJson, nextQuery) } } -} +} \ No newline at end of file
