http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index d77ac7d..5466a9a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -19,57 +19,82 @@ package org.apache.s2graph.core.rest + import java.util.concurrent.{Callable, TimeUnit} + import com.google.common.cache.CacheBuilder -import com.typesafe.config.Config import org.apache.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException} +import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.parsers.{Where, WhereParser} import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.JSONParser._ import play.api.libs.json._ -import play.api.libs.json.Reads._ -import scala.util.{Failure, Success, Try} + +import scala.util.{Random, Failure, Success, Try} object TemplateHelper { val findVar = """\"?\$\{(.*?)\}\"?""".r - val num = """(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r + val num = """(next_minute|next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(minute|hour|day|week)?""".r + val randIntRegex = """randint\((.*,.*)\)""".r - val hour = 60 * 60 * 1000L - val day = hour * 24L - val week = day * 7L + val minute: Long = 60 * 1000L + val hour = 60 * minute + val day = 24 * hour + val week = 7 * day def calculate(now: Long, n: Int, unit: String): Long = { val duration = unit match { + case "minute" | "MINUTE" => n * minute case "hour" | "HOUR" => n * hour case "day" | "DAY" => n * day case "week" | "WEEK" => n * week - case _ => n * day + case _ => n } duration + now } + def randInt(s: String): Long = { + val tokens = s.split(",").map(_.trim) + if (tokens.length != 2) throw new RuntimeException(s"TemplateHelper.randint has wrong format. $s") + val (from, to) = try { + (tokens.head.toInt, tokens.last.toInt) + } catch { + case e: Exception => throw new RuntimeException(s"TemplateHelper.randint has wrong format. $s") + } + if (from > to) throw new RuntimeException(s"TemplateHelper.randint has wrong format. $s") + val diff = to - from + val r = Random.nextInt(diff + 1) + assert(diff >= 0 && diff < Int.MaxValue && from + r < Int.MaxValue) + from + r + } + def replaceVariable(now: Long, body: String): String = { findVar.replaceAllIn(body, m => { val matched = m group 1 + randIntRegex.findFirstMatchIn(matched) match { + case None => + num.replaceSomeIn(matched, m => { + val (_pivot, n, unit) = (m.group(1), m.group(2), m.group(3)) + val ts = _pivot match { + case null => now + case "now" | "NOW" => now + case "next_minute" | "NEXT_MINUTE" => now / minute * minute + minute + case "next_week" | "NEXT_WEEK" => now / week * week + week + case "next_day" | "NEXT_DAY" => now / day * day + day + case "next_hour" | "NEXT_HOUR" => now / hour * hour + hour + } - num.replaceSomeIn(matched, m => { - val (_pivot, n, unit) = (m.group(1), m.group(2), m.group(3)) - val ts = _pivot match { - case null => now - case "now" | "NOW" => now - case "next_week" | "NEXT_WEEK" => now / week * week + week - case "next_day" | "NEXT_DAY" => now / day * day + day - case "next_hour" | "NEXT_HOUR" => now / hour * hour + hour - } - - if (_pivot == null && n == null && unit == null) None - else if (n == null || unit == null) Option(ts.toString) - else Option(calculate(ts, n.replaceAll(" ", "").toInt, unit).toString) - }) + if (_pivot == null && n == null && unit == null) None + else if (n == null || unit == null) Option(ts.toString) + else Option(calculate(ts, n.replaceAll(" ", "").toInt, unit).toString) + }) + case Some(m) => + val range = m group 1 + randInt(range).toString + } }) } } @@ -107,37 +132,37 @@ class RequestParser(graph: Graph) { val DefaultCompressionAlgorithm = config.getString("hbase.table.compression.algorithm") val DefaultPhase = config.getString("phase") val parserCache = CacheBuilder.newBuilder() - .expireAfterAccess(10000, TimeUnit.MILLISECONDS) - .expireAfterWrite(10000, TimeUnit.MILLISECONDS) - .maximumSize(10000) - .initialCapacity(1000) - .build[String, Try[Where]] + .expireAfterAccess(10000, TimeUnit.MILLISECONDS) + .expireAfterWrite(10000, TimeUnit.MILLISECONDS) + .maximumSize(10000) + .initialCapacity(1000) + .build[String, Try[Where]] - private def extractScoring(labelId: Int, value: JsValue) = { + private def extractScoring(label: Label, value: JsValue): Option[Seq[(LabelMeta, Double)]] = { val ret = for { js <- parseOption[JsObject](value, "scoring") } yield { for { (k, v) <- js.fields - labelOrderType <- LabelMeta.findByName(labelId, k) + labelMata <- label.metaPropsInvMap.get(k) } yield { val value = v match { case n: JsNumber => n.as[Double] case _ => throw new Exception("scoring weight should be double.") } - (labelOrderType.seq, value) + (labelMata, value) } } ret } - def extractInterval(label: Label, jsValue: JsValue) = { - def extractKv(js: JsValue) = js match { - case JsObject(map) => map.toSeq + def extractInterval(label: Label, jsValue: JsValue): Option[(Seq[(String, JsValue)], Seq[(String, JsValue)])] = { + def extractKv(js: JsValue): Seq[(String, JsValue)] = js match { + case JsObject(obj) => obj.toSeq case JsArray(arr) => arr.flatMap { - case JsObject(map) => map.toSeq - case _ => throw new RuntimeException(s"cannot support json type: $js") + case JsObject(obj) => obj.toSeq + case _ => throw new RuntimeException(s"cannot support json type $js") } case _ => throw new RuntimeException(s"cannot support json type: $js") } @@ -147,8 +172,8 @@ class RequestParser(graph: Graph) { fromJs <- (js \ "from").asOpt[JsValue] toJs <- (js \ "to").asOpt[JsValue] } yield { - val from = Management.toProps(label, extractKv(fromJs)) - val to = Management.toProps(label, extractKv(toJs)) + val from = extractKv(fromJs) + val to = extractKv(toJs) (from, to) } @@ -188,10 +213,10 @@ class RequestParser(graph: Graph) { labelMeta <- LabelMeta.findByName(label.id.get, k) value <- jsValueToInnerVal(v, labelMeta.dataType, label.schemaVersion) } yield { - labelMeta.seq -> value + labelMeta.name -> value } } - ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike]) + ret.map(_.toMap).getOrElse(Map.empty[String, InnerValLike]) } def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = { @@ -214,6 +239,28 @@ class RequestParser(graph: Graph) { } } + def extractGroupBy(value: Option[JsValue]): GroupBy = value.map { + case obj: JsObject => + val keys = (obj \ "keys").asOpt[Seq[String]].getOrElse(Nil) + val groupByLimit = (obj \ "limit").asOpt[Int].getOrElse(hardLimit) + val minShouldMatchOpt = (obj \ "minimumShouldMatch").asOpt[JsObject].map { o => + val prop = (o \ "prop").asOpt[String].getOrElse("to") + val count = (o \ "count").asOpt[Int].getOrElse(0) + val terms = (o \ "terms").asOpt[Set[JsValue]].getOrElse(Set.empty).map { + case JsString(s) => s + case JsNumber(n) => n + case _ => throw new RuntimeException("not supported data type") + }.map(_.asInstanceOf[Any]) + + MinShouldMatchParam(prop, count, terms) + } + + GroupBy(keys, groupByLimit, minShouldMatch = minShouldMatchOpt) + case arr: JsArray => + val keys = arr.asOpt[Seq[String]].getOrElse(Nil) + GroupBy(keys) + }.getOrElse(GroupBy.Empty) + def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[Vertex] = { val vertices = for { label <- Label.findByName(labelName).toSeq @@ -228,33 +275,40 @@ class RequestParser(graph: Graph) { } def toMultiQuery(jsValue: JsValue, impIdOpt: Option[String]): MultiQuery = { + val globalQueryOption = toQueryOption(jsValue, impIdOpt) val queries = for { queryJson <- (jsValue \ "queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty) } yield { - toQuery(queryJson, impIdOpt = impIdOpt) + val innerQuery = toQuery(queryJson, impIdOpt = impIdOpt) + val queryOption = innerQuery.queryOption + + if (queryOption.groupBy.keys.nonEmpty) throw new BadQueryException("Group by option is not allowed in multiple queries.") + if (queryOption.orderByKeys.nonEmpty) throw new BadQueryException("Order by option is not allowed in multiple queries.") + + if (globalQueryOption.withScore) innerQuery.copy(queryOption = innerQuery.queryOption.copy(withScore = false)) + else innerQuery + // val innerQuery3 = + // if (globalQueryOption.groupBy.keys.nonEmpty) innerQuery2.copy(queryOption = innerQuery2.queryOption.copy(groupBy = GroupBy.Empty)) + // else innerQuery2 + } val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0)) - MultiQuery(queries = queries, weights = weights, queryOption = toQueryOption(jsValue, impIdOpt)) + MultiQuery(queries = queries, weights = weights, queryOption = globalQueryOption) } + def toQueryOption(jsValue: JsValue, impIdOpt: Option[String]): QueryOption = { val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name)) - val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v, impIdOpt = impIdOpt) }.map { q => - q.copy(queryOption = q.queryOption.copy(filterOutFields = filterOutFields)) + val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => + toQuery(v, impIdOpt = impIdOpt) + }.map { q => + q.copy(queryOption = q.queryOption.copy(filterOutFields = filterOutFields, selectColumns = filterOutFields)) } val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true) val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty) -// val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty) - val groupBy = (jsValue \ "groupBy").asOpt[JsValue].getOrElse(JsNull) match { - case obj: JsObject => - val keys = (obj \ "key").asOpt[Seq[String]].getOrElse(Nil) - val groupByLimit = (obj \ "limit").asOpt[Int].getOrElse(hardLimit) - GroupBy(keys, groupByLimit) - case arr: JsArray => - val keys = arr.asOpt[Seq[String]].getOrElse(Nil) - GroupBy(keys) - case _ => GroupBy.Empty - } + + val groupBy = extractGroupBy((jsValue \ "groupBy").asOpt[JsValue]) + val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs => for { js <- jsLs @@ -266,7 +320,7 @@ class RequestParser(graph: Graph) { } column -> ascending } - }.getOrElse(List("score" -> false, "timestamp" -> false)) + }.getOrElse(Nil) val withScore = (jsValue \ "withScore").asOpt[Boolean].getOrElse(true) val returnTree = (jsValue \ "returnTree").asOpt[Boolean].getOrElse(false) //TODO: Refactor this @@ -274,6 +328,8 @@ class RequestParser(graph: Graph) { val returnAgg = (jsValue \ "returnAgg").asOpt[Boolean].getOrElse(true) val scoreThreshold = (jsValue \ "scoreThreshold").asOpt[Double].getOrElse(Double.MinValue) val returnDegree = (jsValue \ "returnDegree").asOpt[Boolean].getOrElse(true) + val ignorePrevStepCache = (jsValue \ "ignorePrevStepCache").asOpt[Boolean].getOrElse(false) + val shouldPropagateScore = (jsValue \ "shouldPropagateScore").asOpt[Boolean].getOrElse(true) QueryOption(removeCycle = removeCycle, selectColumns = selectColumns, @@ -287,34 +343,25 @@ class RequestParser(graph: Graph) { returnAgg = returnAgg, scoreThreshold = scoreThreshold, returnDegree = returnDegree, - impIdOpt = impIdOpt + impIdOpt = impIdOpt, + ignorePrevStepCache, + shouldPropagateScore ) } def toQuery(jsValue: JsValue, impIdOpt: Option[String]): Query = { try { - val vertices = - (for { - value <- parse[List[JsValue]](jsValue, "srcVertices") - serviceName = parse[String](value, "serviceName") - column = parse[String](value, "columnName") - } yield { - val service = Service.findByName(serviceName).getOrElse(throw BadQueryException("service not found")) - val col = ServiceColumn.find(service.id.get, column).getOrElse(throw BadQueryException("bad column name")) - val (idOpt, idsOpt) = ((value \ "id").asOpt[JsValue], (value \ "ids").asOpt[List[JsValue]]) - for { - idVal <- idOpt ++ idsOpt.toSeq.flatten - - /* bug, need to use labels schemaVersion */ - innerVal <- jsValueToInnerVal(idVal, col.columnType, col.schemaVersion) - } yield { - Vertex(SourceVertexId(col.id.get, innerVal), System.currentTimeMillis()) - } - }).flatten + val vertices = for { + value <- (jsValue \ "srcVertices").asOpt[Seq[JsValue]].getOrElse(Nil) + serviceName <- (value \ "serviceName").asOpt[String].toSeq + columnName <- (value \ "columnName").asOpt[String].toSeq + idJson = (value \ "id").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) + idsJson = (value \ "ids").asOpt[Seq[JsValue]].getOrElse(Nil) + id <- (idJson ++ idsJson).flatMap(jsValueToAny(_).toSeq).distinct + } yield Vertex.toVertex(serviceName, columnName, id) if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty") val steps = parse[Vector[JsValue]](jsValue, "steps") - val queryOption = toQueryOption(jsValue, impIdOpt) val querySteps = @@ -350,32 +397,35 @@ class RequestParser(graph: Graph) { val queryParams = for { labelGroup <- queryParamJsVals - queryParam <- parseQueryParam(labelGroup) + queryParam <- parseQueryParam(labelGroup, queryOption) } yield { val (_, columnName) = - if (queryParam.labelWithDir.dir == GraphUtil.directions("out")) { + if (queryParam.dir == GraphUtil.directions("out")) { (queryParam.label.srcService.serviceName, queryParam.label.srcColumnName) } else { (queryParam.label.tgtService.serviceName, queryParam.label.tgtColumnName) } //FIXME: - if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => v.serviceColumn.columnName == columnName)) { + if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => v.columnName == columnName)) { throw BadQueryException("srcVertices contains incompatiable serviceName or columnName with first step.") } queryParam } - Step(queryParams.toList, labelWeights = labelWeights, - // scoreThreshold = stepThreshold, + + + val groupBy = extractGroupBy((step \ "groupBy").asOpt[JsValue]) + + Step(queryParams = queryParams, + labelWeights = labelWeights, nextStepScoreThreshold = nextStepScoreThreshold, nextStepLimit = nextStepLimit, - cacheTTL = cacheTTL) + cacheTTL = cacheTTL, + groupBy = groupBy) } - val ret = Query(vertices, querySteps, queryOption) - // logger.debug(ret.toString) - ret + Query(vertices, querySteps, queryOption) } catch { case e: BadQueryException => throw e @@ -386,12 +436,12 @@ class RequestParser(graph: Graph) { } } - private def parseQueryParam(labelGroup: JsValue): Option[QueryParam] = { + private def parseQueryParam(labelGroup: JsValue, queryOption: QueryOption): Option[QueryParam] = { for { labelName <- parseOption[String](labelGroup, "label") } yield { val label = Label.findByName(labelName).getOrElse(throw BadQueryException(s"$labelName not found")) - val direction = parseOption[String](labelGroup, "direction").map(GraphUtil.toDirection(_)).getOrElse(0) + val direction = parseOption[String](labelGroup, "direction").getOrElse("out") val limit = { parseOption[Int](labelGroup, "limit") match { case None => defaultLimit @@ -402,28 +452,24 @@ class RequestParser(graph: Graph) { val offset = parseOption[Int](labelGroup, "offset").getOrElse(0) val interval = extractInterval(label, labelGroup) val duration = extractDuration(label, labelGroup) - val scoring = extractScoring(label.id.get, labelGroup).getOrElse(List.empty[(Byte, Double)]).toList + val scoring = extractScoring(label, labelGroup).getOrElse(Nil).toList val exclude = parseOption[Boolean](labelGroup, "exclude").getOrElse(false) val include = parseOption[Boolean](labelGroup, "include").getOrElse(false) val hasFilter = extractHas(label, labelGroup) - val labelWithDir = LabelWithDirection(label.id.get, direction) - val indexNameOpt = (labelGroup \ "index").asOpt[String] - val indexSeq = indexNameOpt match { - case None => label.indexSeqsMap.get(scoring.map(kv => kv._1)).map(_.seq).getOrElse(LabelIndex.DefaultSeq) - case Some(indexName) => label.indexNameMap.get(indexName).map(_.seq).getOrElse(throw new RuntimeException("cannot find index")) - } + + val indexName = (labelGroup \ "index").asOpt[String].getOrElse(LabelIndex.DefaultName) val whereClauseOpt = (labelGroup \ "where").asOpt[String] val where = extractWhere(label, whereClauseOpt) val includeDegree = (labelGroup \ "includeDegree").asOpt[Boolean].getOrElse(true) val rpcTimeout = (labelGroup \ "rpcTimeout").asOpt[Int].getOrElse(DefaultRpcTimeout) val maxAttempt = (labelGroup \ "maxAttempt").asOpt[Int].getOrElse(DefaultMaxAttempt) - val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].flatMap { jsVal => - jsValueToInnerVal(jsVal, label.tgtColumnWithDir(direction).columnType, label.schemaVersion) - } + + val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].filterNot(_ == JsNull).flatMap(jsValueToAny) + val cacheTTL = (labelGroup \ "cacheTTL").asOpt[Long].getOrElse(-1L) val timeDecayFactor = (labelGroup \ "timeDecay").asOpt[JsObject].map { jsVal => val propName = (jsVal \ "propName").asOpt[String].getOrElse(LabelMeta.timestamp.name) - val propNameSeq = label.metaPropsInvMap.get(propName).map(_.seq).getOrElse(LabelMeta.timeStampSeq) + val propNameSeq = label.metaPropsInvMap.get(propName).getOrElse(LabelMeta.timestamp) val initial = (jsVal \ "initial").asOpt[Double].getOrElse(1.0) val decayRate = (jsVal \ "decayRate").asOpt[Double].getOrElse(0.1) if (decayRate >= 1.0 || decayRate <= 0.0) throw new BadQueryException("decay rate should be 0.0 ~ 1.0") @@ -432,40 +478,36 @@ class RequestParser(graph: Graph) { } val threshold = (labelGroup \ "threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold) // TODO: refactor this. dirty - val duplicate = parseOption[String](labelGroup, "duplicate").map(s => Query.DuplicatePolicy(s)) + val duplicate = parseOption[String](labelGroup, "duplicate").map(s => DuplicatePolicy(s)).getOrElse(DuplicatePolicy.First) val outputField = (labelGroup \ "outputField").asOpt[String].map(s => Json.arr(Json.arr(s))) - val transformer = if (outputField.isDefined) outputField else (labelGroup \ "transform").asOpt[JsValue] + val transformer = (if (outputField.isDefined) outputField else (labelGroup \ "transform").asOpt[JsValue]) match { + case None => EdgeTransformer(EdgeTransformer.DefaultJson) + case Some(json) => EdgeTransformer(json) + } val scorePropagateOp = (labelGroup \ "scorePropagateOp").asOpt[String].getOrElse("multiply") val scorePropagateShrinkage = (labelGroup \ "scorePropagateShrinkage").asOpt[Long].getOrElse(500l) val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1) val shouldNormalize = (labelGroup \ "normalize").asOpt[Boolean].getOrElse(false) val cursorOpt = (labelGroup \ "cursor").asOpt[String] // FIXME: Order of command matter - QueryParam(labelWithDir) - .sample(sample) - .rank(RankParam(label.id.get, scoring)) - .exclude(exclude) - .include(include) - .duration(duration) - .has(hasFilter) - .labelOrderSeq(indexSeq) - .interval(interval) - .limit(offset, limit) - .where(where) - .duplicatePolicy(duplicate) - .includeDegree(includeDegree) - .rpcTimeout(rpcTimeout) - .maxAttempt(maxAttempt) - .tgtVertexInnerIdOpt(tgtVertexInnerIdOpt) - .cacheTTLInMillis(cacheTTL) - .timeDecay(timeDecayFactor) - .threshold(threshold) - .transformer(transformer) - .scorePropagateOp(scorePropagateOp) - .scorePropagateShrinkage(scorePropagateShrinkage) - .shouldNormalize(shouldNormalize) - .cursorOpt(cursorOpt) + QueryParam(labelName = labelName, + direction = direction, + offset = offset, + limit = limit, + sample = sample, + maxAttempt = maxAttempt, + rpcTimeout = rpcTimeout, + cacheTTLInMillis = cacheTTL, + indexName = indexName, where = where, threshold = threshold, + rank = RankParam(scoring), intervalOpt = interval, durationOpt = duration, + exclude = exclude, include = include, has = hasFilter, duplicatePolicy = duplicate, + includeDegree = includeDegree, scorePropagateShrinkage = scorePropagateShrinkage, + scorePropagateOp = scorePropagateOp, shouldNormalize = shouldNormalize, + whereRawOpt = whereClauseOpt, cursorOpt = cursorOpt, + tgtVertexIdOpt = tgtVertexInnerIdOpt, + edgeTransformer = transformer, timeDecay = timeDecayFactor + ) } } @@ -489,6 +531,11 @@ class RequestParser(graph: Graph) { } } + def jsToStr(js: JsValue): String = js match { + case JsString(s) => s + case _ => js.toString() + } + def parseBulkFormat(str: String): Seq[(GraphElement, String)] = { val edgeStrs = str.split("\\n").filterNot(_.isEmpty) val elementsWithTsv = for { @@ -624,30 +671,15 @@ class RequestParser(graph: Graph) { } def toCheckEdgeParam(jsValue: JsValue) = { - val params = jsValue.as[List[JsValue]] - var isReverted = false - val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]() - val quads = for { - param <- params - labelName <- (param \ "label").asOpt[String] - direction <- GraphUtil.toDir((param \ "direction").asOpt[String].getOrElse("out")) - label <- Label.findByName(labelName) - srcId <- jsValueToInnerVal((param \ "from").as[JsValue], label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion) - tgtId <- jsValueToInnerVal((param \ "to").as[JsValue], label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion) + for { + json <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil) + from <- (json \ "from").asOpt[JsValue].flatMap(jsValueToAny(_)) + to <- (json \ "to").asOpt[JsValue].flatMap(jsValueToAny(_)) + labelName <- (json \ "label").asOpt[String] + direction = (json \ "direction").asOpt[String].getOrElse("out") } yield { - val labelWithDir = LabelWithDirection(label.id.get, direction) - labelWithDirs += labelWithDir - val (src, tgt, dir) = if (direction == 1) { - isReverted = true - (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), - Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), 0) - } else { - (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), - Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), 0) - } - (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir))) + Edge.toEdge(from, to, labelName, direction, Map.empty) } - (quads, isReverted) } def toGraphElements(str: String): Seq[GraphElement] = { @@ -671,16 +703,6 @@ class RequestParser(graph: Graph) { (labels, direction, ids, ts, vertices) } - def toFetchAndDeleteParam(json: JsValue) = { - val labelName = (json \ "label").as[String] - val fromOpt = (json \ "from").asOpt[JsValue] - val toOpt = (json \ "to").asOpt[JsValue] - val direction = (json \ "direction").asOpt[String].getOrElse("out") - val indexOpt = (json \ "index").asOpt[String] - val propsOpt = (json \ "props").asOpt[JsObject] - (labelName, fromOpt, toOpt, direction, indexOpt, propsOpt) - } - def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = jsQuery.as[Seq[JsObject]].map { obj => def _require(field: String) = throw new RuntimeException(s"${field} not found")
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala index 4c77ad6..099a7f9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala @@ -29,6 +29,8 @@ import org.apache.s2graph.core.utils.logger import play.api.libs.json._ import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try +import scala.util.control.NonFatal object RestHandler { trait CanLookup[A] { @@ -56,7 +58,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { import RestHandler._ val requestParser = new RequestParser(graph) - + val querySampleRate: Double = graph.config.getDouble("query.log.sample.rate") /** * Public APIS @@ -69,15 +71,8 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { val jsQuery = Json.parse(body) uri match { -// case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toSimpleVertexArrJson)) - case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toJson)) -// case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted)) -// case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)) -// case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)) + case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toJson(Option(jsQuery)))) case "/graphs/checkEdges" => checkEdges(jsQuery) -// case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList)) -// case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude)) -// case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)) case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery)) case "/graphs/experiments" => experiments(jsQuery) case uri if uri.startsWith("/graphs/experiment") => @@ -93,17 +88,27 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { // TODO: Refactor to doGet def checkEdges(jsValue: JsValue): HandlerResult = { try { - val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue) - - HandlerResult(graph.checkEdges(quads).map { case stepResult => - PostProcess.toJson(graph, QueryOption(), stepResult) + val edges = requestParser.toCheckEdgeParam(jsValue) + + HandlerResult(graph.checkEdges(edges).map { case stepResult => + val jsArray = for { + s2EdgeWithScore <- stepResult.edgeWithScores +// json <- PostProcess.s2EdgeToJsValue(QueryOption(), s2EdgeWithScore) + json = PostProcess.s2EdgeToJsValue(QueryOption(), s2EdgeWithScore) + } yield json + Json.toJson(jsArray) }) } catch { - case e: Exception => HandlerResult(Future.failed(e)) + case e: Exception => + logger.error(s"RestHandler#checkEdges error: $e") + HandlerResult(Future.failed(e)) } } + /** + * Private APIS + */ private def experiments(jsQuery: JsValue): HandlerResult = { val params: Seq[RequestParser.ExperimentParam] = requestParser.parseExperiment(jsQuery) @@ -120,7 +125,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { HandlerResult(body = result) } - private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String] = None): HandlerResult = { + def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String] = None): HandlerResult = { try { val bucketOpt = for { service <- Service.findByAccessToken(accessToken) @@ -128,9 +133,21 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { bucket <- experiment.findBucket(uuid, impKeyOpt) } yield bucket - val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found")) + val bucket = bucketOpt.getOrElse(throw new RuntimeException(s"bucket is not found. $accessToken, $experimentName, $uuid, $impKeyOpt")) if (bucket.isGraphQuery) { val ret = buildRequestInner(contentsBody, bucket, uuid) + + logQuery(Json.obj( + "type" -> "experiment", + "time" -> System.currentTimeMillis(), + "body" -> contentsBody, + "uri" -> Seq("graphs", "experiment", accessToken, experimentName, uuid).mkString("/"), + "accessToken" -> accessToken, + "experimentName" -> experimentName, + "uuid" -> uuid, + "impressionId" -> bucket.impressionId + )) + HandlerResult(ret.body, Experiment.ImpressionKey -> bucket.impressionId) } else throw new RuntimeException("not supported yet") @@ -156,24 +173,39 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { def getEdgesAsync(jsonQuery: JsValue, impIdOpt: Option[String] = None) (post: (Graph, QueryOption, StepResult) => JsValue): Future[JsValue] = { - jsonQuery match { - case obj@JsObject(_) => - (obj \ "queries").asOpt[JsValue] match { - case None => - val query = requestParser.toQuery(obj, impIdOpt) - graph.getEdges(query).map(post(graph, query.queryOption, _)) - case _ => - val multiQuery = requestParser.toMultiQuery(obj, impIdOpt) - graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _)) - } - - case JsArray(arr) => - val queries = arr.map(requestParser.toQuery(_, impIdOpt)) - val weights = queries.map(_ => 1.0) - val multiQuery = MultiQuery(queries, weights, QueryOption(), jsonQuery) - graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _)) - - case _ => throw BadQueryException("Cannot support") + + def query(obj: JsValue): Future[JsValue] = { + (obj \ "queries").asOpt[JsValue] match { + case None => + val s2Query = requestParser.toQuery(obj, impIdOpt) + graph.getEdges(s2Query).map(post(graph, s2Query.queryOption, _)) + case _ => + val multiQuery = requestParser.toMultiQuery(obj, impIdOpt) + graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _)) + } + } + + logQuery(Json.obj( + "type" -> "getEdges", + "time" -> System.currentTimeMillis(), + "body" -> jsonQuery, + "uri" -> "graphs/getEdges" + )) + + val unionQuery = (jsonQuery \ "union").asOpt[JsObject] + unionQuery match { + case None => jsonQuery match { + case obj@JsObject(_) => query(obj) + case JsArray(arr) => + val res = arr.map(js => query(js.as[JsObject])) + Future.sequence(res).map(JsArray) + case _ => throw BadQueryException("Cannot support") + } + + case Some(jsUnion) => + val (keys, queries) = jsUnion.value.unzip + val futures = queries.map(query) + Future.sequence(futures).map(res => JsObject(keys.zip(res).toSeq)) } } @@ -194,7 +226,6 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) } } - private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = { var body = bucket.requestBody.replace("#uuid", uuid) @@ -203,19 +234,24 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { jsObj <- requestKeyJson.asOpt[JsObject] (key, value) <- jsObj.fieldSet } { + val escaped = Json.stringify(value) val replacement = value match { - case JsString(s) => s - case _ => value.toString + case _: JsString => escaped.slice(1, escaped.length - 1) + case _ => escaped } + body = body.replace(key, replacement) } body } - def calcSize(js: JsValue): Int = js match { - case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0) - case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum - case _ => 0 + def calcSize(js: JsValue): Int = + (js \\ "size") map { sizeJs => sizeJs.asOpt[Int].getOrElse(0) } sum + + def logQuery(queryJson: => JsObject): Unit = { + if (scala.util.Random.nextDouble() < querySampleRate) { + logger.query(queryJson.toString) + } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala index c76c25c..403ceeb 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala @@ -24,24 +24,29 @@ import org.hbase.async.KeyValue object SKeyValue { + val EdgeCf = "e".getBytes() val Put = 1 val Delete = 2 val Increment = 3 val Default = Put } + case class SKeyValue(table: Array[Byte], row: Array[Byte], cf: Array[Byte], qualifier: Array[Byte], value: Array[Byte], timestamp: Long, - operation: Int = SKeyValue.Default) { + operation: Int = SKeyValue.Default, + durability: Boolean = true) { def toLogString = { - Map("table" -> table.toList, "row" -> row.toList, "cf" -> Bytes.toString(cf), + Map("table" -> Bytes.toString(table), "row" -> row.toList, "cf" -> Bytes.toString(cf), "qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> timestamp, - "operation" -> operation).mapValues(_.toString).toString + "operation" -> operation, "durability" -> durability).toString } override def toString(): String = toLogString + + def toKeyValue: KeyValue = new KeyValue(row, cf, qualifier, timestamp, value) } trait CanSKeyValue[T] { @@ -64,3 +69,4 @@ object CanSKeyValue { // For hbase KeyValues } + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index a6e81b4..07e39aa 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -19,53 +19,44 @@ package org.apache.s2graph.core.storage -import java.util.concurrent.{Executors, TimeUnit} -import com.typesafe.config.Config -import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException +import org.apache.s2graph.core.GraphExceptions.{NoStackException, FetchTimeoutException} import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.parsers.WhereParser import org.apache.s2graph.core.storage.serde.indexedge.wide.{IndexEdgeDeserializable, IndexEdgeSerializable} import org.apache.s2graph.core.storage.serde.snapshotedge.wide.SnapshotEdgeDeserializable import org.apache.s2graph.core.storage.serde.vertex.{VertexDeserializable, VertexSerializable} import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.utils.{Extensions, logger} +import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} -import scala.annotation.tailrec -import scala.collection.Seq -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Random, Try} +import java.util.concurrent.{Executors, TimeUnit} + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.hadoop.hbase.util.Bytes + -abstract class Storage[R](val graph: Graph, +abstract class Storage[Q, R](val graph: Graph, val config: Config)(implicit ec: ExecutionContext) { import HBaseType._ + import Graph._ - /** storage dependent configurations */ - val DeleteAllFetchCount = config.getInt("delete.all.fetch.count") - val MaxRetryNum = config.getInt("max.retry.number") - val MaxBackOff = config.getInt("max.back.off") - val BackoffTimeout = config.getInt("back.off.timeout") - val DeleteAllFetchSize = config.getInt("delete.all.fetch.size") - val FailProb = config.getDouble("hbase.fail.prob") - val LockExpireDuration = config.getInt("lock.expire.time") - val MaxSize = config.getInt("future.cache.max.size") - val ExpireAfterWrite = config.getInt("future.cache.expire.after.write") - val ExpireAfterAccess = config.getInt("future.cache.expire.after.access") + val BackoffTimeout = graph.BackoffTimeout + val MaxRetryNum = graph.MaxRetryNum + val MaxBackOff = graph.MaxBackOff + val FailProb = graph.FailProb + val LockExpireDuration = graph.LockExpireDuration + val MaxSize = graph.MaxSize + val ExpireAfterWrite = graph.ExpireAfterWrite + val ExpireAfterAccess = graph.ExpireAfterAccess /** retry scheduler */ val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor() - /** handle mutate failed */ - val exceptionHandler = new ExceptionHandler(config) - val failTopic = s"mutateFailed_${config.getString("phase")}" - - /** fallback */ - val fallback = Future.successful(StepResult.Empty) - val innerFallback = Future.successful(StepInnerResult.Empty) - /** * Compatibility table * | label schema version | snapshot edge | index edge | vertex | note | @@ -109,7 +100,7 @@ abstract class Storage[R](val graph: Graph, * @param vertex: vertex to serialize * @return serializer implementation */ - def vertexSerializer(vertex: Vertex) = new VertexSerializable(vertex) + def vertexSerializer(vertex: Vertex): Serializable[Vertex] = new VertexSerializable(vertex) /** * create deserializer that can parse stored CanSKeyValue into snapshotEdge. @@ -142,7 +133,7 @@ abstract class Storage[R](val graph: Graph, indexEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}")) /** create deserializer that can parser stored CanSKeyValue into vertex. */ - val vertexDeserializer = new VertexDeserializable + val vertexDeserializer: Deserializable[Vertex] = new VertexDeserializable /** @@ -170,7 +161,7 @@ abstract class Storage[R](val graph: Graph, * @param request * @return */ - def fetchSnapshotEdgeKeyValues(request: AnyRef): Future[Seq[SKeyValue]] + def fetchSnapshotEdgeKeyValues(request: QueryRequest): Future[Seq[SKeyValue]] /** * write requestKeyValue into storage if the current value in storage that is stored matches. @@ -200,10 +191,11 @@ abstract class Storage[R](val graph: Graph, * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues. * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build * client request(GetRequest, Scanner) based on user provided query. - * @param queryRequest + * + * @param queryRequest * @return - */ - def buildRequest(queryRequest: QueryRequest): AnyRef + */ + protected def buildRequest(queryRequest: QueryRequest, edge: Edge): Q /** * fetch IndexEdges for given queryParam in queryRequest. @@ -215,53 +207,55 @@ abstract class Storage[R](val graph: Graph, * so single I/O return type should be Deferred[T]. * * if we use native hbase client, then this return type can be Future[T] or just T. - * @param queryRequest - * @param prevStepScore + * + * @param queryRequest * @param isInnerCall * @param parentEdges * @return */ def fetch(queryRequest: QueryRequest, - prevStepScore: Double, isInnerCall: Boolean, parentEdges: Seq[EdgeWithScore]): R /** * responsible to fire parallel fetch call into storage and create future that will return merged result. - * @param queryRequestWithScoreLs + * + * @param queryRequests * @param prevStepEdges * @return */ - def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepInnerResult]] + def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] /** * fetch Vertex for given request from storage. - * @param request + * + * @param request * @return */ - def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] + def fetchVertexKeyValues(request: QueryRequest): Future[Seq[SKeyValue]] /** * decide how to apply given edges(indexProps values + Map(_count -> countVal)) into storage. - * @param edges + * + * @param edges * @param withWait * @return */ - def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] + def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] /** * this method need to be called when client shutdown. this is responsible to cleanUp the resources * such as client into storage. */ def flush(): Unit = { - exceptionHandler.shutdown() } /** * create table on storage. * if storage implementation does not support namespace or table, then there is nothing to be done - * @param zkAddr + * + * @param zkAddr * @param tableName * @param cfs * @param regionMultiplier @@ -273,28 +267,29 @@ abstract class Storage[R](val graph: Graph, cfs: List[String], regionMultiplier: Int, ttl: Option[Int], - compressionAlgorithm: String): Unit + compressionAlgorithm: String, + replicationScopeOpt: Option[Int] = None, + totalRegionCount: Option[Int] = None): Unit /** Public Interface */ - def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { - def fromResult(queryParam: QueryParam, - kvs: Seq[SKeyValue], + def fromResult(kvs: Seq[SKeyValue], version: String): Option[Vertex] = { if (kvs.isEmpty) None - else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None) + else vertexDeserializer.fromKeyValues(None, kvs, version, None) +// .map(S2Vertex(graph, _)) } val futures = vertices.map { vertex => val queryParam = QueryParam.Empty val q = Query.toQuery(Seq(vertex), queryParam) val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) - fetchVertexKeyValues(buildRequest(queryRequest)).map { kvs => - fromResult(queryParam, kvs, vertex.serviceColumn.schemaVersion) + fetchVertexKeyValues(queryRequest).map { kvs => + fromResult(kvs, vertex.serviceColumn.schemaVersion) } recoverWith { case ex: Throwable => Future.successful(None) } @@ -302,92 +297,39 @@ abstract class Storage[R](val graph: Graph, Future.sequence(futures).map { result => result.toList.flatten } } - - def mutateElements(elements: Seq[GraphElement], - withWait: Boolean = false): Future[Seq[Boolean]] = { - - val edgeBuffer = ArrayBuffer[Edge]() - val vertexBuffer = ArrayBuffer[Vertex]() - - elements.foreach { - case e: Edge => edgeBuffer += e - case v: Vertex => vertexBuffer += v - case any@_ => logger.error(s"Unknown type: ${any}") - } - - val edgeFuture = mutateEdges(edgeBuffer, withWait) - val vertexFuture = mutateVertices(vertexBuffer, withWait) - - val graphFuture = for { - edgesMutated <- edgeFuture - verticesMutated <- vertexFuture - } yield edgesMutated ++ verticesMutated - - graphFuture - } - - def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { - val (strongEdges, weakEdges) = - (edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk"))) - - val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map { case (zkQuorum, edges) => - val mutations = edges.flatMap { edge => - val (_, edgeUpdate) = - if (edge.op == GraphUtil.operations("delete")) Edge.buildDeleteBulk(None, edge) - else Edge.buildOperation(None, Seq(edge)) - - buildVertexPutsAsync(edge) ++ indexedEdgeMutations(edgeUpdate) ++ - snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate) - } - writeToStorage(zkQuorum, mutations, withWait) - } - val strongEdgesFutures = mutateStrongEdges(strongEdges, withWait) - for { - weak <- Future.sequence(weakEdgesFutures) - strong <- strongEdgesFutures - } yield { - strong ++ weak - } - } - def mutateStrongEdges(_edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { - val grouped = _edges.groupBy { edge => (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq + val edgeWithIdxs = _edges.zipWithIndex + val grouped = edgeWithIdxs.groupBy { case (edge, idx) => + (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId) + } toSeq val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => - val (deleteAllEdges, edges) = edgeGroup.partition(_.op == GraphUtil.operations("deleteAll")) - - // DeleteAll first - val deleteAllFutures = deleteAllEdges.map { edge => - deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts) - } - + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) // After deleteAll, process others - lazy val mutateEdgeFutures = edges.toList match { + val mutateEdgeFutures = edges.toList match { case head :: tail => - // val strongConsistency = edges.head.label.consistencyLevel == "strong" - // if (strongConsistency) { val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , withWait) //TODO: decide what we will do on failure on vertex put val puts = buildVertexPutsAsync(head) val vertexFuture = writeToStorage(head.label.hbaseZkAddr, puts, withWait) Seq(edgeFuture, vertexFuture) - // } else { - // edges.map { edge => mutateEdge(edge, withWait = withWait) } - // } case Nil => Nil } val composed = for { - deleteRet <- Future.sequence(deleteAllFutures) +// deleteRet <- Future.sequence(deleteAllFutures) mutateRet <- Future.sequence(mutateEdgeFutures) - } yield deleteRet ++ mutateRet + } yield mutateRet - composed.map(_.forall(identity)) + composed.map(_.forall(identity)).map { ret => idxs.map(idx => idx -> ret) } } - Future.sequence(mutateEdges) + Future.sequence(mutateEdges).map { squashedRets => + squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) + } } def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = { @@ -413,16 +355,22 @@ abstract class Storage[R](val graph: Graph, checkConsistency: Boolean, withWait: Boolean): Future[Boolean] = { assert(edges.nonEmpty) + // TODO:: remove after code review: unreachable code if (!checkConsistency) { + val zkQuorum = edges.head.label.hbaseZkAddr val futures = edges.map { edge => val (_, edgeUpdate) = Edge.buildOperation(None, Seq(edge)) - val mutations = indexedEdgeMutations(edgeUpdate) ++ snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate) + + val mutations = + indexedEdgeMutations(edgeUpdate) ++ snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate) + + writeToStorage(zkQuorum, mutations, withWait) } Future.sequence(futures).map { rets => rets.forall(identity) } } else { - fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => retry(1)(edges, 0, snapshotEdgeOpt) } } @@ -438,9 +386,6 @@ abstract class Storage[R](val graph: Graph, if (tryNum >= MaxRetryNum) { edges.foreach { edge => logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}") - - val kafkaMessage = ExceptionHandler.toKafkaMessage(failTopic, element = edge) - exceptionHandler.enqueue(kafkaMessage) } Future.successful(false) @@ -454,11 +399,10 @@ abstract class Storage[R](val graph: Graph, case FetchTimeoutException(retryEdge) => logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") /** fetch failed. re-fetch should be done */ - fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) } - case PartialFailureException(retryEdge, failedStatusCode, faileReason) => val status = failedStatusCode match { case 0 => "AcquireLock failed." @@ -477,7 +421,7 @@ abstract class Storage[R](val graph: Graph, val future = if (failedStatusCode == 0) { // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge. /** fetch failed. re-fetch should be done */ - fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) } } else { @@ -620,7 +564,8 @@ abstract class Storage[R](val graph: Graph, /** * orchestrate commit process. * we separate into 4 step to avoid duplicating each step over and over. - * @param statusCode: current statusCode of this thread to process edges. + * + * @param statusCode: current statusCode of this thread to process edges. * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. * @param fetchedSnapshotEdgeOpt: fetched snapshotEdge from storage before commit process begin. * @param lockSnapshotEdge: lockEdge that hold necessary data to lock this snapshotEdge for this thread. @@ -643,7 +588,7 @@ abstract class Storage[R](val graph: Graph, } yield lockReleased } - case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: String) extends Exception + case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: String) extends NoStackException(failReason) protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = { val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n") @@ -658,7 +603,8 @@ abstract class Storage[R](val graph: Graph, /** * try to acquire lock on storage for this given snapshotEdge(lockEdge). - * @param statusCode: current statusCode of this thread to process edges. + * + * @param statusCode: current statusCode of this thread to process edges. * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug * @param fetchedSnapshotEdgeOpt: fetched snapshot edge from storage. * @param lockEdge: lockEdge to build RPC request(compareAndSet) into Storage. @@ -708,7 +654,8 @@ abstract class Storage[R](val graph: Graph, * change this snapshot's state on storage from locked into committed by * storing new merged states on storage. merge state come from releaseLockEdge. * note that releaseLock return Future.failed on predicate failure. - * @param predicate: indicate if this releaseLock phase should be proceed or not. + * + * @param predicate: indicate if this releaseLock phase should be proceed or not. * @param statusCode: releaseLock do not use statusCode, only for debug. * @param squashedEdge: squashed(in memory) final edge from input edges on same snapshotEdge. only for debug * @param releaseLockEdge: final merged states if all process goes well. @@ -797,6 +744,18 @@ abstract class Storage[R](val graph: Graph, statusCode: Byte, squashedEdge: Edge, edgeMutate: EdgeMutate): Future[Boolean] = { + + def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { + writeToStorage(squashedEdge.label.hbaseZkAddr, kvs, withWait = withWait).map { ret => + if (ret) { + debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate) + } else { + throw new PartialFailureException(squashedEdge, 2, "hbase fail.") + } + true + } + } + if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 2, "predicate failed.")) if (statusCode >= 3) { logger.debug(s"skip increment: [$statusCode]\n${squashedEdge.toLogString}") @@ -804,21 +763,13 @@ abstract class Storage[R](val graph: Graph, } else { val p = Random.nextDouble() if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 2, s"$p")) - else - writeToStorage(squashedEdge.label.hbaseZkAddr, increments(edgeMutate), withWait = true).map { ret => - if (ret) { - debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate) - } else { - throw new PartialFailureException(squashedEdge, 2, "hbase fail.") - } - true - } + else { + val incrs = increments(edgeMutate) + _write(incrs, true) + } } } - - - /** end of methods for consistency */ def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge], @@ -832,17 +783,18 @@ abstract class Storage[R](val graph: Graph, /** Delete All */ - protected def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepInnerResult, + def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, requestTs: Long, retryNum: Int): Future[Boolean] = { if (stepInnerResult.isEmpty) Future.successful(true) else { - val head = stepInnerResult.edgesWithScoreLs.head + val head = stepInnerResult.edgeWithScores.head val zkQuorum = head.edge.label.hbaseZkAddr val futures = for { - edgeWithScore <- stepInnerResult.edgesWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + edgeWithScore <- stepInnerResult.edgeWithScores } yield { + val edge = edgeWithScore.edge + val score = edgeWithScore.score /** reverted direction */ val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge => indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ @@ -861,141 +813,6 @@ abstract class Storage[R](val graph: Graph, } } - protected def buildEdgesToDelete(stepInnerResult: StepInnerResult, requestTs: Long): StepInnerResult = { - val filtered = stepInnerResult.edgesWithScoreLs.filter { edgeWithScore => - (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree - } - if (filtered.isEmpty) StepInnerResult.Empty - else { - val head = filtered.head - val label = head.edge.label - val edgeWithScoreLs = filtered.map { edgeWithScore => - val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { - case "strong" => - val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++ - Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion)) - (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) - case _ => - val oldEdge = edgeWithScore.edge - (oldEdge.op, oldEdge.version, oldEdge.propsWithTs) - } - - val copiedEdge = - edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) - - val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) - // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") - edgeToDelete - } - //Degree edge? - StepInnerResult(edgeWithScoreLs, Nil, false) - } - } - - protected def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepInnerResult], - requestTs: Long): Future[(Boolean, Boolean)] = { - stepInnerResultLs.foreach { stepInnerResult => - if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.") - } - val futures = for { - stepInnerResult <- stepInnerResultLs - deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs) - if deleteStepInnerResult.edgesWithScoreLs.nonEmpty - } yield { - val head = deleteStepInnerResult.edgesWithScoreLs.head - val label = head.edge.label - label.schemaVersion match { - case HBaseType.VERSION3 | HBaseType.VERSION4 => - if (label.consistencyLevel == "strong") { - /** - * read: snapshotEdge on queryResult = O(N) - * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) - */ - mutateEdges(deleteStepInnerResult.edgesWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity)) - } else { - deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) - } - case _ => - - /** - * read: x - * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) - */ - deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) - } - } - - if (futures.isEmpty) { - // all deleted. - Future.successful(true -> true) - } else { - Future.sequence(futures).map { rets => false -> rets.forall(identity) } - } - } - - protected def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = { - val future = for { - stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_))) - (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs) - } yield { -// logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}") - (allDeleted, ret) - } - - Extensions.retryOnFailure(MaxRetryNum) { - future - } { - logger.error(s"fetch and deleteAll failed.") - (true, false) - } - - } - - def deleteAllAdjacentEdges(srcVertices: Seq[Vertex], - labels: Seq[Label], - dir: Int, - ts: Long): Future[Boolean] = { - - def enqueueLogMessage() = { - val kafkaMessages = for { - vertice <- srcVertices - id = vertice.innerId.toIdString() - label <- labels - } yield { - val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", GraphUtil.fromOp(dir.toByte)).mkString("\t") - ExceptionHandler.toKafkaMessage(failTopic, tsv) - } - - kafkaMessages.foreach(exceptionHandler.enqueue) - } - - val requestTs = ts - /** create query per label */ - val queries = for { - label <- labels - } yield { - val labelWithDir = LabelWithDirection(label.id.get, dir) - val queryParam = QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw)) - val step = Step(List(queryParam)) - Query(srcVertices, Vector(step)) - } - - // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) { - val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) { - fetchAndDeleteAll(queries, requestTs) - } { case (allDeleted, deleteSuccess) => - allDeleted && deleteSuccess - }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } - - retryFuture onFailure { - case ex => - logger.error(s"[Error]: deleteAllAdjacentEdges failed.") - enqueueLogMessage() - } - - retryFuture - } - /** End Of Delete All */ @@ -1003,29 +820,34 @@ abstract class Storage[R](val graph: Graph, /** Parsing Logic: parse from kv from Storage into Edge */ def toEdge[K: CanSKeyValue](kv: K, - queryParam: QueryParam, + queryRequest: QueryRequest, cacheElementOpt: Option[IndexEdge], parentEdges: Seq[EdgeWithScore]): Option[Edge] = { -// logger.debug(s"toEdge: $kv") + logger.debug(s"toEdge: $kv") + try { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam val schemaVer = queryParam.label.schemaVersion - val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) - indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = parentEdges)) + val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) + if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = parentEdges)) + else indexEdgeOpt.map(indexEdge => indexEdge.toEdge) } catch { case ex: Exception => - logger.error(s"Fail on toEdge: ${kv.toString}, ${queryParam}", ex) + logger.error(s"Fail on toEdge: ${kv.toString}, ${queryRequest}", ex) None } } def toSnapshotEdge[K: CanSKeyValue](kv: K, - queryParam: QueryParam, + queryRequest: QueryRequest, cacheElementOpt: Option[SnapshotEdge] = None, isInnerCall: Boolean, parentEdges: Seq[EdgeWithScore]): Option[Edge] = { // logger.debug(s"SnapshottoEdge: $kv") + val queryParam = queryRequest.queryParam val schemaVer = queryParam.label.schemaVersion - val snapshotEdgeOpt = snapshotEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) + val snapshotEdgeOpt = snapshotEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) if (isInnerCall) { snapshotEdgeOpt.flatMap { snapshotEdge => @@ -1045,87 +867,137 @@ abstract class Storage[R](val graph: Graph, } } + val dummyCursor: Array[Byte] = Array.empty + def toEdges[K: CanSKeyValue](kvs: Seq[K], - queryParam: QueryParam, + queryRequest: QueryRequest, prevScore: Double = 1.0, isInnerCall: Boolean, parentEdges: Seq[EdgeWithScore], startOffset: Int = 0, - len: Int = Int.MaxValue): Seq[EdgeWithScore] = { - if (kvs.isEmpty) Seq.empty + len: Int = Int.MaxValue): StepResult = { + + val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _ + + if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor)) else { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam + val labelWeight = queryRequest.labelWeight + val nextStepOpt = queryRequest.nextStepOpt + val where = queryParam.where.get + val label = queryParam.label + val isDefaultTransformer = queryParam.edgeTransformer.isDefault val first = kvs.head val kv = first val schemaVer = queryParam.label.schemaVersion val cacheElementOpt = if (queryParam.isSnapshotEdge) None - else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None) + else indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), Seq(kv), queryParam.label.schemaVersion, None) + + val (degreeEdges, keyValues) = cacheElementOpt match { + case None => (Nil, kvs) + case Some(cacheElement) => + val head = cacheElement.toEdge + if (!head.isDegree) (Nil, kvs) + else (Seq(EdgeWithScore(head, 1.0, label)), kvs.tail) + } - for { - (kv, idx) <- kvs.zipWithIndex if idx >= startOffset && idx < startOffset + len - edge <- - if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryParam, None, isInnerCall, parentEdges) - else toEdge(kv, queryParam, cacheElementOpt, parentEdges) - } yield { - //TODO: Refactor this. - val currentScore = - queryParam.scorePropagateOp match { - case "plus" => edge.rank(queryParam.rank) + prevScore - case "divide" => - if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 - else edge.rank(queryParam.rank) / (prevScore + queryParam.scorePropagateShrinkage) - case _ => edge.rank(queryParam.rank) * prevScore + val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor) + + if (!queryOption.ignorePrevStepCache) { + val edgeWithScores = for { + (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len + edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq + if where == WhereParser.success || where.filter(edge) + convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) + } yield { + val score = edge.rank(queryParam.rank) + EdgeWithScore(convertedEdge, score, label) + } + StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor) + } else { + val degreeScore = 0.0 + + val edgeWithScores = for { + (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len + edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq + if where == WhereParser.success || where.filter(edge) + convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) + } yield { + val edgeScore = edge.rank(queryParam.rank) + val score = queryParam.scorePropagateOp match { + case "plus" => edgeScore + prevScore + case "divide" => + if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0 + else edgeScore / (prevScore + queryParam.scorePropagateShrinkage) + case _ => edgeScore * prevScore + } + val tsVal = processTimeDecay(queryParam, edge) + val newScore = degreeScore + score + EdgeWithScore(convertedEdge.copy(parentEdges = parentEdges), score = newScore * labelWeight * tsVal, label = label) } - EdgeWithScore(edge, currentScore) + + val sampled = + if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) + else edgeWithScores + + val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled + + StepResult(edgeWithScores = normalized, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor) } } } /** End Of Parse Logic */ - protected def toRequestEdge(queryRequest: QueryRequest): Edge = { + protected def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): Edge = { val srcVertex = queryRequest.vertex - // val tgtVertexOpt = queryRequest.tgtVertexOpt - val edgeCf = Serializable.edgeCf val queryParam = queryRequest.queryParam val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt val label = queryParam.label val labelWithDir = queryParam.labelWithDir val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir) - val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match { + val propsWithTs = label.EmptyPropsWithTs + + tgtVertexIdOpt match { case Some(tgtVertexId) => // _to is given. /** we use toSnapshotEdge so dont need to swap src, tgt */ val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion) - (src, tgt) + val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, src), TargetVertexId(tgtColumn.id.get, tgt)) + val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId)) + + Edge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs) case None => val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) - (src, src) - } + val srcVId = SourceVertexId(srcColumn.id.get, src) + val srcV = Vertex(srcVId) - val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), TargetVertexId(tgtColumn.id.get, tgtInnerId)) - val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId)) - val currentTs = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)).toMap - Edge(srcV, tgtV, labelWithDir, propsWithTs = propsWithTs) + Edge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges) + } } + protected def fetchSnapshotEdgeInner(edge: Edge): Future[(QueryParam, Option[Edge], Option[SKeyValue])] = { + /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache + * so use empty cacheKey. + * */ + val queryParam = QueryParam(labelName = edge.label.label, + direction = GraphUtil.fromDirection(edge.labelWithDir.dir), + tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), + cacheTTLInMillis = -1) + val q = Query.toQuery(Seq(edge.srcVertex), queryParam) + val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) + // val q = Query.toQuery(Seq(edge.srcVertex), queryParam) - protected def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, Option[Edge], Option[SKeyValue])] = { - val labelWithDir = edge.labelWithDir - val queryParam = QueryParam(labelWithDir) - val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId)) - val q = Query.toQuery(Seq(edge.srcVertex), _queryParam) - val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam) - - fetchSnapshotEdgeKeyValues(buildRequest(queryRequest)).map { kvs => + fetchSnapshotEdgeKeyValues(queryRequest).map { kvs => val (edgeOpt, kvOpt) = if (kvs.isEmpty) (None, None) else { - val _edgeOpt = toEdges(kvs, queryParam, 1.0, isInnerCall = true, parentEdges = Nil).headOption.map(_.edge) + val snapshotEdgeOpt = toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil) val _kvOpt = kvs.headOption - (_edgeOpt, _kvOpt) + (snapshotEdgeOpt, _kvOpt) } (queryParam, edgeOpt, kvOpt) } recoverWith { case ex: Throwable => @@ -1134,182 +1006,7 @@ abstract class Storage[R](val graph: Graph, } } - protected def fetchStep(orgQuery: Query, - stepIdx: Int, - stepInnerResult: StepInnerResult): Future[StepInnerResult] = { - if (stepInnerResult.isEmpty) Future.successful(StepInnerResult.Empty) - else { - val edgeWithScoreLs = stepInnerResult.edgesWithScoreLs - - val q = orgQuery - - val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None - val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold) - val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1) - val step = q.steps(stepIdx) - - val alreadyVisited = - if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean] - else Graph.alreadyVisitedVertices(stepInnerResult.edgesWithScoreLs) - - val groupedBy = edgeWithScoreLs.map { case edgeWithScore => - edgeWithScore.edge.tgtVertex -> edgeWithScore - }.groupBy { case (vertex, edgeWithScore) => vertex } - - val groupedByFiltered = for { - (vertex, edgesWithScore) <- groupedBy - aggregatedScore = edgesWithScore.map(_._2.score).sum if aggregatedScore >= prevStepThreshold - } yield vertex -> aggregatedScore - - val prevStepTgtVertexIdEdges = for { - (vertex, edgesWithScore) <- groupedBy - } yield vertex.id -> edgesWithScore.map { case (vertex, edgeWithScore) => edgeWithScore } - - val nextStepSrcVertices = if (prevStepLimit >= 0) { - groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit) - } else { - groupedByFiltered.toSeq - } - - val queryRequests = for { - (vertex, prevStepScore) <- nextStepSrcVertices - queryParam <- step.queryParams - } yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore) - - val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) - Graph.filterEdges(orgQuery, stepIdx, queryRequests.map(_._1), fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited)(ec) - } - } - private def getEdgesStepInner(q: Query): Future[StepInnerResult] = { - Try { - if (q.steps.isEmpty) innerFallback - else { - // current stepIdx = -1 - val startStepInnerResult = QueryResult.fromVertices(q) - q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) => - for { - prevStepInnerResult <- prevStepInnerResultFuture - currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult) - } yield currentStepInnerResult - } - } - } recover { - case e: Exception => - logger.error(s"getEdgesAsync: $e", e) - innerFallback - } get - } - def getEdges(q: Query): Future[StepResult] = { - Try { - if (q.steps.isEmpty) { - // TODO: this should be get vertex query. - fallback - } else { - val filterOutFuture = q.queryOption.filterOutQuery match { - case None => innerFallback - case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) - } - for { - innerResult <- getEdgesStepInner(q) - filterOutInnerResult <- filterOutFuture - } yield { - val result = StepResult(graph, q.queryOption, innerResult) - if (filterOutInnerResult.isEmpty) result - else { - StepResult.filterOut(graph, q.queryOption, result, filterOutInnerResult) - } - } - } - } recover { - case e: Exception => - logger.error(s"getEdgesAsync: $e", e) - fallback - } get - } - def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = { - val fallback = Future.successful(StepResult.Empty) - - Try { - if (mq.queries.isEmpty) fallback - else { - val filterOutFuture = mq.queryOption.filterOutQuery match { - case None => innerFallback - case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) - } - - val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) }) - for { - multiQueryResults <- multiQueryFutures - filterOutInnerResult <- filterOutFuture - } yield { - val merged = StepResult.merges(mq.queryOption, multiQueryResults, mq.weights) - StepResult.filterOut(graph, mq.queryOption, merged, filterOutInnerResult) - } - } - } recover { - case e: Exception => - logger.error(s"getEdgesAsync: $e", e) - fallback - } get - } - - def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = { - val ts = System.currentTimeMillis() - val futures = for { - (srcVertex, tgtVertex, queryParam) <- params - propsWithTs = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, queryParam.label.schemaVersion)) - edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = propsWithTs) - } yield { - fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) => - edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0)) - } - } - - Future.sequence(futures).map { edgeWithScoreLs => - val s2EdgeWithScoreLs = edgeWithScoreLs.flatMap { ls => - ls.map { edgeWithScore => - S2EdgeWithScore(edgeWithScore.edge, edgeWithScore.score) - } - } - StepResult(results = s2EdgeWithScoreLs, grouped = Nil, degreeEdges = Nil) - } - } - - - - @tailrec - final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = { - if (range < sampleNumber || set.size == sampleNumber) set - else randomInt(sampleNumber, range, set + Random.nextInt(range)) - } - - protected def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = { - if (edges.size <= n){ - edges - }else{ - val plainEdges = if (queryRequest.queryParam.offset == 0) { - edges.tail - } else edges - - val randoms = randomInt(n, plainEdges.size) - var samples = List.empty[EdgeWithScore] - var idx = 0 - plainEdges.foreach { e => - if (randoms.contains(idx)) samples = e :: samples - idx += 1 - } - samples.toSeq - } - - } - - protected def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { - val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } - edgeWithScores.map { edgeWithScore => - edgeWithScore.copy(score = edgeWithScore.score / sum) - } - } /** end of query */ /** Mutation Builder */ @@ -1317,51 +1014,78 @@ abstract class Storage[R](val graph: Graph, /** EdgeMutate */ def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = { + // skip sampling for delete operation val deleteMutations = edgeMutate.edgesToDelete.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) + indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete, durability = indexEdge.label.durability)) } + val insertMutations = edgeMutate.edgesToInsert.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + if (indexEdge.isOutEdge) indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) + else { + // For InEdge + indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) + } } deleteMutations ++ insertMutations } def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = - edgeMutate.newSnapshotEdge.map(e => snapshotEdgeSerializer(e).toKeyValues).getOrElse(Nil) + edgeMutate.newSnapshotEdge.map(e => snapshotEdgeSerializer(e).toKeyValues.map(_.copy(durability = e.label.durability))).getOrElse(Nil) + + def incrementsInOut(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = { + + def filterOutDegree(e: IndexEdge): Boolean = true - def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] = (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match { case (true, true) => /** when there is no need to update. shouldUpdate == false */ - List.empty + (Nil, Nil) case (true, false) => /** no edges to delete but there is new edges to insert so increase degree by 1 */ - edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) } + val (inEdges, outEdges) = edgeMutate.edgesToInsert.partition(_.isInEdge) + + val in = inEdges.filter(filterOutDegree).flatMap(buildIncrementsAsync(_)) + val out = outEdges.filter(filterOutDegree).flatMap(buildIncrementsAsync(_)) + + in -> out case (false, true) => /** no edges to insert but there is old edges to delete so decrease degree by 1 */ - edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) } + val (inEdges, outEdges) = edgeMutate.edgesToDelete.partition(_.isInEdge) + + val in = inEdges.filter(filterOutDegree).flatMap(buildIncrementsAsync(_, -1)) + val out = outEdges.filter(filterOutDegree).flatMap(buildIncrementsAsync(_, -1)) + + in -> out case (false, false) => /** update on existing edges so no change on degree */ - List.empty + (Nil, Nil) } + } + + def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] = { + val (in, out) = incrementsInOut(edgeMutate) + in ++ out + } /** IndexEdge */ def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) + val newProps = indexedEdge.props ++ Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) val _indexedEdge = indexedEdge.copy(props = newProps) - indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) + indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability)) } def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) + val newProps = indexedEdge.props ++ Map(LabelMeta.count -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) val _indexedEdge = indexedEdge.copy(props = newProps) - indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) + indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability)) } + + //TODO: ServiceColumn do not have durability property yet. def buildDeleteBelongsToId(vertex: Vertex): Seq[SKeyValue] = { val kvs = vertexSerializer(vertex).toKeyValues val kv = kvs.head @@ -1383,10 +1107,23 @@ abstract class Storage[R](val graph: Graph, } } + def buildDegreePuts(edge: Edge, degreeVal: Long): Seq[SKeyValue] = { + val kvs = edge.edgesWithIndexValid.flatMap { _indexEdge => + val newProps = Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, _indexEdge.ts, _indexEdge.schemaVer)) + val indexEdge = _indexEdge.copy(props = newProps) + + indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) + } + + kvs + } + def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = { vertex.op match { case d: Byte if d == GraphUtil.operations("delete") => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) case _ => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Put)) } } + + def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName) }
