http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala index ede1127..0377bd8 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala @@ -19,10 +19,53 @@ package org.apache.s2graph.core +import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn} import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId} import play.api.libs.json.Json - +// +//object S2Vertex { +// def apply(graph: Graph, vertex: Vertex): S2Vertex = { +// S2Vertex(graph, +// vertex.serviceName, +// vertex.serviceColumn.columnName, +// vertex.innerIdVal, +// vertex.serviceColumn.innerValsToProps(vertex.props), +// vertex.ts, +// GraphUtil.fromOp(vertex.op) +// ) +// } +//} +// +//case class S2Vertex(graph: Graph, +// serviceName: String, +// columnName: String, +// id: Any, +// props: Map[String, Any] = Map.empty, +// ts: Long = System.currentTimeMillis(), +// operation: String = "insert") extends GraphElement { +// lazy val vertex = { +// val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found.")) +// val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found.")) +// val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) +// +// val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion)) +// val propsInner = column.propsToInnerVals(props) ++ +// Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion)) +// +// Vertex(srcVertexId, ts, propsInner, op) +// } +// +// val uniqueId = (serviceName, columnName, id) +// +// override def isAsync: Boolean = vertex.isAsync +// +// override def toLogString(): String = vertex.toLogString() +// +// override def queueKey: String = vertex.queueKey +// +// override def queuePartitionKey: String = vertex.queuePartitionKey +//} case class Vertex(id: VertexId, ts: Long = System.currentTimeMillis(), props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike], @@ -31,10 +74,19 @@ case class Vertex(id: VertexId, val innerId = id.innerId + val innerIdVal = innerId.value + + lazy val properties = for { + (k, v) <- props + meta <- serviceColumn.metasMap.get(k) + } yield meta.name -> v.value + def schemaVer = serviceColumn.schemaVersion def serviceColumn = ServiceColumn.findById(id.colId) + def columnName = serviceColumn.columnName + def service = Service.findById(serviceColumn.serviceId) lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName) @@ -114,7 +166,21 @@ object Vertex { def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue - // val emptyVertex = Vertex(new CompositeId(CompositeId.defaultColId, CompositeId.defaultInnerId, false, true), - // System.currentTimeMillis()) - def fromString(s: String): Option[Vertex] = Graph.toVertex(s) + def toVertex(serviceName: String, + columnName: String, + id: Any, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): Vertex = { + + val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found.")) + val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found.")) + val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + + val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion)) + val propsInner = column.propsToInnerVals(props) ++ + Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion)) + + new Vertex(srcVertexId, ts, propsInner, op) + } }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala index 9bd172d..cd825f4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala @@ -25,7 +25,8 @@ import scalikejdbc._ import scala.util.Random object Experiment extends Model[Experiment] { - val impressionKey = "S2-Impression-Id" + val ImpressionKey = "S2-Impression-Id" + val ImpressionId = "Impression-Id" def apply(rs: WrappedResultSet): Experiment = { Experiment(rs.intOpt("id"), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala index 2734211..f7318f6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala @@ -19,13 +19,15 @@ package org.apache.s2graph.core.mysqls +import java.util.Calendar import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs} import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.JSONParser._ -import play.api.libs.json._ +import play.api.libs.json.{JsObject, JsValue, Json} import scalikejdbc._ object Label extends Model[Label] { @@ -48,6 +50,7 @@ object Label extends Model[Label] { Label.delete(id.get) } + def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = { val cacheKey = "label=" + labelName lazy val labelOpt = @@ -292,11 +295,16 @@ case class Label(id: Option[Int], label: String, lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found")) lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found")) - lazy val direction = if (isDirected) "out" else "undirected" lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq) //TODO: Make sure this is correct + +// lazy val metas = metas(useCache = true) lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true) + lazy val labelMetas = LabelMeta.findAllByLabelId(id.get, useCache = true) + lazy val labelMetaSet = labelMetas.toSet + lazy val labelMetaMap = (labelMetas ++ LabelMeta.reservedMetas).map(m => m.seq -> m).toMap + lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap @@ -327,14 +335,37 @@ case class Label(id: Option[Int], label: String, jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) } yield prop.name -> jsValue).toMap + lazy val metaPropsDefaultMapInnerString = (for { + prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq) + innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis()) + } yield prop.name -> innerVal).toMap + lazy val metaPropsDefaultMapInner = (for { prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq) + innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis()) + } yield prop -> innerVal).toMap + lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq + lazy val metaPropsJsValueWithDefault = (for { + prop <- metaProps if LabelMeta.isValidSeq(prop.seq) jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) - } yield prop.name -> jsValue).toMap + } yield prop -> jsValue).toMap +// lazy val extraOptions = Model.extraOptions(Option("""{ +// "storage": { +// "s2graph.storage.backend": "rocks", +// "rocks.db.path": "/tmp/db" +// } +// }""")) lazy val extraOptions: Map[String, JsValue] = options match { case None => Map.empty - case Some(v) => Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty) + case Some(v) => + try { + Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty) + } catch { + case e: Exception => + logger.error(s"An error occurs while parsing the extra label option: ${label}", e) + Map.empty + } } def srcColumnWithDir(dir: Int) = { @@ -386,5 +417,23 @@ case class Label(id: Option[Int], label: String, ) + def propsToInnerValsWithTs(props: Map[String, Any], + ts: Long = System.currentTimeMillis()): Map[Byte, InnerValLikeWithTs] = { + for { + (k, v) <- props + labelMeta <- metaPropsInvMap.get(k) + innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion) + } yield labelMeta.seq -> InnerValLikeWithTs(innerVal, ts) + + } + + def innerValsWithTsToProps(props: Map[Byte, InnerValLikeWithTs]): Map[String, Any] = { + for { + (k, v) <- props + labelMeta <- metaPropsMap.get(k) + } yield { + labelMeta.name -> v.innerVal.value + } + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala index b68fa79..6fceabc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala @@ -24,6 +24,8 @@ package org.apache.s2graph.core.mysqls */ import org.apache.s2graph.core.JSONParser +import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.types.{InnerValLikeWithTs, InnerValLike} import play.api.libs.json.Json import scalikejdbc._ object ServiceColumn extends Model[ServiceColumn] { @@ -89,13 +91,40 @@ object ServiceColumn extends Model[ServiceColumn] { }) } } -case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) { +case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) { lazy val service = Service.findById(serviceId) lazy val metas = ColumnMeta.findAllByColumn(id.get) + lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType) + def propsToInnerVals(props: Map[String, Any]): Map[Int, InnerValLike] = { + for { + (k, v) <- props + labelMeta <- metasInvMap.get(k) + innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion) + } yield labelMeta.seq.toInt -> innerVal + } + + def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = { + for { + (k, v) <- props + columnMeta <- metasMap.get(k) + } yield { + columnMeta.name -> v.value + } + } + + def innerValsWithTsToProps(props: Map[Int, InnerValLikeWithTs]): Map[String, Any] = { + for { + (k, v) <- props + columnMeta <- metasMap.get(k) + } yield { + columnMeta.name -> v.innerVal.value + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala index 5920f3c..1c93667 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.core.parsers import org.apache.s2graph.core.GraphExceptions.WhereParserException import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.types.InnerValLike -import org.apache.s2graph.core.{Edge, GraphExceptions, JSONParser} +import org.apache.s2graph.core.Edge import org.apache.s2graph.core.JSONParser._ import scala.annotation.tailrec import scala.util.Try http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index 52ee50d..d77ac7d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -36,9 +36,11 @@ import scala.util.{Failure, Success, Try} object TemplateHelper { val findVar = """\"?\$\{(.*?)\}\"?""".r val num = """(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r + val hour = 60 * 60 * 1000L val day = hour * 24L val week = day * 7L + def calculate(now: Long, n: Int, unit: String): Long = { val duration = unit match { case "hour" | "HOUR" => n * hour @@ -72,12 +74,32 @@ object TemplateHelper { } } -class RequestParser(config: Config) { +object RequestParser { + type ExperimentParam = (JsObject, String, String, String, Option[String]) + val defaultLimit = 100 + + def toJsValues(jsValue: JsValue): List[JsValue] = { + jsValue match { + case obj: JsObject => List(obj) + case arr: JsArray => arr.as[List[JsValue]] + case _ => List.empty[JsValue] + } + } + + def jsToStr(js: JsValue): String = js match { + case JsString(s) => s + case _ => js.toString() + } + +} + +class RequestParser(graph: Graph) { import Management.JsonModel._ + import RequestParser._ - val hardLimit = 100000 - val defaultLimit = 100 + val config = graph.config + val hardLimit = config.getInt("query.hardlimit") val maxLimit = Int.MaxValue - 1 val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout") val DefaultMaxAttempt = config.getInt("hbase.client.retries.number") @@ -106,13 +128,11 @@ class RequestParser(config: Config) { (labelOrderType.seq, value) } } + ret } - def extractInterval(label: Label, _jsValue: JsValue) = { - val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString()) - val jsValue = Json.parse(replaced) - + def extractInterval(label: Label, jsValue: JsValue) = { def extractKv(js: JsValue) = js match { case JsObject(map) => map.toSeq case JsArray(arr) => arr.flatMap { @@ -135,15 +155,21 @@ class RequestParser(config: Config) { ret } - def extractDuration(label: Label, _jsValue: JsValue) = { - val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString()) - val jsValue = Json.parse(replaced) - + def extractDuration(label: Label, jsValue: JsValue) = { for { js <- parseOption[JsObject](jsValue, "duration") } yield { - val minTs = parseOption[Long](js, "from").getOrElse(Long.MaxValue) - val maxTs = parseOption[Long](js, "to").getOrElse(Long.MinValue) + val minTs = (js \ "from").get match { + case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong + case JsNumber(n) => n.toLong + case _ => Long.MinValue + } + + val maxTs = (js \ "to").get match { + case JsString(s) => TemplateHelper.replaceVariable(System.currentTimeMillis(), s).toLong + case JsNumber(n) => n.toLong + case _ => Long.MaxValue + } if (minTs > maxTs) { throw new BadQueryException("Duration error. Timestamp of From cannot be larger than To.") @@ -167,21 +193,24 @@ class RequestParser(config: Config) { } ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike]) } - + def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = { whereClauseOpt match { case None => Success(WhereParser.success) - case Some(_where) => - val where = TemplateHelper.replaceVariable(System.currentTimeMillis(), _where) + case Some(where) => val whereParserKey = s"${label.label}_${where}" + parserCache.get(whereParserKey, new Callable[Try[Where]] { override def call(): Try[Where] = { - WhereParser(label).parse(where) match { + val _where = TemplateHelper.replaceVariable(System.currentTimeMillis(), where) + + WhereParser(label).parse(_where) match { case s@Success(_) => s case Failure(ex) => throw BadQueryException(ex.getMessage, ex) } } }) + } } @@ -194,28 +223,38 @@ class RequestParser(config: Config) { } yield { Vertex(SourceVertexId(serviceColumn.id.get, innerId), System.currentTimeMillis()) } - vertices.toSeq + + vertices } - def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery = { + def toMultiQuery(jsValue: JsValue, impIdOpt: Option[String]): MultiQuery = { val queries = for { queryJson <- (jsValue \ "queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty) } yield { - toQuery(queryJson, isEdgeQuery) + toQuery(queryJson, impIdOpt = impIdOpt) } val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0)) - MultiQuery(queries = queries, weights = weights, - queryOption = toQueryOption(jsValue), jsonQuery = jsValue) + MultiQuery(queries = queries, weights = weights, queryOption = toQueryOption(jsValue, impIdOpt)) } - def toQueryOption(jsValue: JsValue): QueryOption = { + def toQueryOption(jsValue: JsValue, impIdOpt: Option[String]): QueryOption = { val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name)) - val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v) }.map { q => + val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v, impIdOpt = impIdOpt) }.map { q => q.copy(queryOption = q.queryOption.copy(filterOutFields = filterOutFields)) } val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true) val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty) - val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty) +// val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty) + val groupBy = (jsValue \ "groupBy").asOpt[JsValue].getOrElse(JsNull) match { + case obj: JsObject => + val keys = (obj \ "key").asOpt[Seq[String]].getOrElse(Nil) + val groupByLimit = (obj \ "limit").asOpt[Int].getOrElse(hardLimit) + GroupBy(keys, groupByLimit) + case arr: JsArray => + val keys = arr.asOpt[Seq[String]].getOrElse(Nil) + GroupBy(keys) + case _ => GroupBy.Empty + } val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs => for { js <- jsLs @@ -238,7 +277,7 @@ class RequestParser(config: Config) { QueryOption(removeCycle = removeCycle, selectColumns = selectColumns, - groupByColumns = groupByColumns, + groupBy = groupBy, orderByColumns = orderByColumns, filterOutQuery = filterOutQuery, filterOutFields = filterOutFields, @@ -247,10 +286,12 @@ class RequestParser(config: Config) { limitOpt = limitOpt, returnAgg = returnAgg, scoreThreshold = scoreThreshold, - returnDegree = returnDegree + returnDegree = returnDegree, + impIdOpt = impIdOpt ) } - def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = { + + def toQuery(jsValue: JsValue, impIdOpt: Option[String]): Query = { try { val vertices = (for { @@ -274,7 +315,7 @@ class RequestParser(config: Config) { if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty") val steps = parse[Vector[JsValue]](jsValue, "steps") - val queryOption = toQueryOption(jsValue) + val queryOption = toQueryOption(jsValue, impIdOpt) val querySteps = steps.zipWithIndex.map { case (step, stepIdx) => @@ -332,7 +373,7 @@ class RequestParser(config: Config) { } - val ret = Query(vertices, querySteps, queryOption, jsValue) + val ret = Query(vertices, querySteps, queryOption) // logger.debug(ret.toString) ret } catch { @@ -354,10 +395,8 @@ class RequestParser(config: Config) { val limit = { parseOption[Int](labelGroup, "limit") match { case None => defaultLimit - case Some(l) if l < 0 => maxLimit - case Some(l) if l >= 0 => - val default = hardLimit - Math.min(l, default) + case Some(l) if l < 0 => hardLimit + case Some(l) if l >= 0 => Math.min(l, hardLimit) } } val offset = parseOption[Int](labelGroup, "offset").getOrElse(0) @@ -405,7 +444,6 @@ class RequestParser(config: Config) { // FIXME: Order of command matter QueryParam(labelWithDir) .sample(sample) - .limit(offset, limit) .rank(RankParam(label.id.get, scoring)) .exclude(exclude) .include(include) @@ -413,6 +451,7 @@ class RequestParser(config: Config) { .has(hasFilter) .labelOrderSeq(indexSeq) .interval(interval) + .limit(offset, limit) .where(where) .duplicatePolicy(duplicate) .includeDegree(includeDegree) @@ -450,21 +489,8 @@ class RequestParser(config: Config) { } } - def toJsValues(jsValue: JsValue): List[JsValue] = { - jsValue match { - case obj: JsObject => List(obj) - case arr: JsArray => arr.as[List[JsValue]] - case _ => List.empty[JsValue] - } - } - - def jsToStr(js: JsValue): String = js match { - case JsString(s) => s - case _ => js.toString() - } - def parseBulkFormat(str: String): Seq[(GraphElement, String)] = { - val edgeStrs = str.split("\\n") + val edgeStrs = str.split("\\n").filterNot(_.isEmpty) val elementsWithTsv = for { edgeStr <- edgeStrs str <- GraphUtil.parseString(edgeStr) @@ -480,24 +506,23 @@ class RequestParser(config: Config) { } private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(Edge, String)] = { - val srcId = (jsValue \ "from").asOpt[JsValue].map(jsToStr) - val tgtId = (jsValue \ "to").asOpt[JsValue].map(jsToStr) - val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(jsToStr)) ++ srcId - val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(tos => tos.map(jsToStr)) ++ tgtId + val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil) + val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil) val label = parse[String](jsValue, "label") val timestamp = parse[Long](jsValue, "timestamp") - val direction = parseOption[String](jsValue, "direction").getOrElse("") - val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}") + val direction = parseOption[String](jsValue, "direction").getOrElse("out") + val propsJson = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()) for { - srcId <- srcIds - tgtId <- tgtIds + srcId <- srcIds.flatMap(jsValueToAny(_).toSeq) + tgtId <- tgtIds.flatMap(jsValueToAny(_).toSeq) } yield { - val edge = Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString) + // val edge = Management.toEdge(graph, timestamp, operation, srcId, tgtId, label, direction, fromJsonToProperties(propsJson)) + val edge = Edge.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation) val tsv = (jsValue \ "direction").asOpt[String] match { - case None => Seq(timestamp, operation, "e", srcId, tgtId, label, props).mkString("\t") - case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, props, dir).mkString("\t") + case None => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString).mkString("\t") + case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString, dir).mkString("\t") } (edge, tsv) @@ -513,8 +538,8 @@ class RequestParser(config: Config) { val ts = parseOption[Long](jsValue, "timestamp").getOrElse(System.currentTimeMillis()) val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get - val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()) - Management.toVertex(ts, operation, id.toString, sName, cName, props.toString) + val props = fromJsonToProperties((jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())) + Vertex.toVertex(sName, cName, id.toString, props, ts, operation) } def toPropElements(jsObj: JsValue) = Try { @@ -637,7 +662,7 @@ class RequestParser(config: Config) { def toDeleteParam(json: JsValue) = { val labelName = (json \ "label").as[String] - val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil).filterNot(_.isAsync) + val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil) val direction = (json \ "direction").asOpt[String].getOrElse("out") val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil) @@ -645,4 +670,30 @@ class RequestParser(config: Config) { val vertices = toVertices(labelName, direction, ids) (labels, direction, ids, ts, vertices) } + + def toFetchAndDeleteParam(json: JsValue) = { + val labelName = (json \ "label").as[String] + val fromOpt = (json \ "from").asOpt[JsValue] + val toOpt = (json \ "to").asOpt[JsValue] + val direction = (json \ "direction").asOpt[String].getOrElse("out") + val indexOpt = (json \ "index").asOpt[String] + val propsOpt = (json \ "props").asOpt[JsObject] + (labelName, fromOpt, toOpt, direction, indexOpt, propsOpt) + } + + def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = jsQuery.as[Seq[JsObject]].map { obj => + def _require(field: String) = throw new RuntimeException(s"${field} not found") + + val accessToken = (obj \ "accessToken").asOpt[String].getOrElse(_require("accessToken")) + val experimentName = (obj \ "experiment").asOpt[String].getOrElse(_require("experiment")) + val uuid = (obj \ "#uuid").get match { + case JsString(s) => s + case JsNumber(n) => n.toString + case _ => _require("#uuid") + } + val body = (obj \ "params").asOpt[JsObject].getOrElse(Json.obj()) + val impKeyOpt = (obj \ Experiment.ImpressionKey).asOpt[String] + + (body, accessToken, experimentName, uuid, impKeyOpt) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala index 55b3e79..4c77ad6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala @@ -21,7 +21,8 @@ package org.apache.s2graph.core.rest import java.net.URL -import org.apache.s2graph.core.GraphExceptions.BadQueryException +import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException} +import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service} import org.apache.s2graph.core.utils.logger @@ -29,8 +30,21 @@ import play.api.libs.json._ import scala.concurrent.{ExecutionContext, Future} - object RestHandler { + trait CanLookup[A] { + def lookup(m: A, key: String): Option[String] + } + + object CanLookup { + implicit val oneTupleLookup = new CanLookup[(String, String)] { + override def lookup(m: (String, String), key: String) = + if (m._1 == key) Option(m._2) else None + } + implicit val hashMapLookup = new CanLookup[Map[String, String]] { + override def lookup(m: Map[String, String], key: String): Option[String] = m.get(key) + } + } + case class HandlerResult(body: Future[JsValue], headers: (String, String)*) } @@ -41,25 +55,31 @@ object RestHandler { class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { import RestHandler._ - val requestParser = new RequestParser(graph.config) + val requestParser = new RequestParser(graph) + /** * Public APIS */ - def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): HandlerResult = { + def doPost[A](uri: String, body: String, headers: A)(implicit ev: CanLookup[A]): HandlerResult = { + val impKeyOpt = ev.lookup(headers, Experiment.ImpressionKey) + val impIdOpt = ev.lookup(headers, Experiment.ImpressionId) + try { val jsQuery = Json.parse(body) uri match { - case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)) - case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted)) - case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)) - case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)) +// case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toSimpleVertexArrJson)) + case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, impIdOpt)(PostProcess.toJson)) +// case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted)) +// case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)) +// case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)) case "/graphs/checkEdges" => checkEdges(jsQuery) - case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList)) - case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude)) - case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)) +// case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList)) +// case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude)) +// case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)) case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery)) + case "/graphs/experiments" => experiments(jsQuery) case uri if uri.startsWith("/graphs/experiment") => val Array(accessToken, experimentName, uuid) = uri.split("/").takeRight(3) experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt) @@ -75,17 +95,8 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { try { val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue) - HandlerResult(graph.checkEdges(quads).map { case queryRequestWithResultLs => - val edgeJsons = for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - convertedEdge = if (isReverted) edge.duplicateEdge else edge - edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam) - } yield Json.toJson(edgeJson) - - Json.toJson(edgeJsons) + HandlerResult(graph.checkEdges(quads).map { case stepResult => + PostProcess.toJson(graph, QueryOption(), stepResult) }) } catch { case e: Exception => HandlerResult(Future.failed(e)) @@ -93,11 +104,23 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { } - /** - * Private APIS - */ - private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String]): HandlerResult = { + private def experiments(jsQuery: JsValue): HandlerResult = { + val params: Seq[RequestParser.ExperimentParam] = requestParser.parseExperiment(jsQuery) + + val results = params map { case (body, token, experimentName, uuid, impKeyOpt) => + val handlerResult = experiment(body, token, experimentName, uuid, impKeyOpt) + val future = handlerResult.body.recover { + case e: Exception => PostProcess.emptyResults ++ Json.obj("error" -> Json.obj("reason" -> e.getMessage)) + } + + future + } + + val result = Future.sequence(results).map(JsArray) + HandlerResult(body = result) + } + private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String] = None): HandlerResult = { try { val bucketOpt = for { service <- Service.findByAccessToken(accessToken) @@ -108,7 +131,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found")) if (bucket.isGraphQuery) { val ret = buildRequestInner(contentsBody, bucket, uuid) - HandlerResult(ret.body, Experiment.impressionKey -> bucket.impressionId) + HandlerResult(ret.body, Experiment.ImpressionKey -> bucket.impressionId) } else throw new RuntimeException("not supported yet") } catch { @@ -127,119 +150,54 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { val experimentLog = s"POST $path took -1 ms 200 -1 $body" logger.debug(experimentLog) - doPost(path, body) - } - } - - private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = { - val filterOutQueryResultsLs = q.filterOutQuery match { - case Some(filterOutQuery) => graph.getEdges(filterOutQuery) - case None => Future.successful(Seq.empty) - } - - for { - queryResultsLs <- graph.getEdges(q) - filterOutResultsLs <- filterOutQueryResultsLs - } yield { - val json = post(queryResultsLs, filterOutResultsLs) - json + doPost(path, body, Experiment.ImpressionId -> bucket.impressionId) } } - def getEdgesAsync(jsonQuery: JsValue) - (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { - - val fetch = eachQuery(post) _ + def getEdgesAsync(jsonQuery: JsValue, impIdOpt: Option[String] = None) + (post: (Graph, QueryOption, StepResult) => JsValue): Future[JsValue] = { jsonQuery match { - case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray) case obj@JsObject(_) => (obj \ "queries").asOpt[JsValue] match { - case None => fetch(requestParser.toQuery(obj)) + case None => + val query = requestParser.toQuery(obj, impIdOpt) + graph.getEdges(query).map(post(graph, query.queryOption, _)) case _ => - val multiQuery = requestParser.toMultiQuery(obj) - val filterOutFuture = multiQuery.queryOption.filterOutQuery match { - case Some(filterOutQuery) => graph.getEdges(filterOutQuery) - case None => Future.successful(Seq.empty) - } - val futures = multiQuery.queries.zip(multiQuery.weights).map { case (query, weight) => - val filterOutQueryResultsLs = query.queryOption.filterOutQuery match { - case Some(filterOutQuery) => graph.getEdges(filterOutQuery) - case None => Future.successful(Seq.empty) - } - for { - queryRequestWithResultLs <- graph.getEdges(query) - filterOutResultsLs <- filterOutQueryResultsLs - } yield { - val newQueryRequestWithResult = for { - queryRequestWithResult <- queryRequestWithResultLs - queryResult = queryRequestWithResult.queryResult - } yield { - val newEdgesWithScores = for { - edgeWithScore <- queryRequestWithResult.queryResult.edgeWithScoreLs - } yield { - edgeWithScore.copy(score = edgeWithScore.score * weight) - } - queryRequestWithResult.copy(queryResult = queryResult.copy(edgeWithScoreLs = newEdgesWithScores)) - } - logger.debug(s"[Size]: ${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}") - (newQueryRequestWithResult, filterOutResultsLs) - } - } - for { - filterOut <- filterOutFuture - resultWithExcludeLs <- Future.sequence(futures) - } yield { - PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, resultWithExcludeLs, filterOut) - // val initial = (ListBuffer.empty[QueryRequestWithResult], ListBuffer.empty[QueryRequestWithResult]) - // val (results, excludes) = resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), (results, excludes)) => - // (prevResults ++= results, prevExcludes ++= excludes) - // } - // PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ filterOut) - } + val multiQuery = requestParser.toMultiQuery(obj, impIdOpt) + graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _)) } - case _ => throw BadQueryException("Cannot support") - } - } - - private def getEdgesExcludedAsync(jsonQuery: JsValue) - (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { - val q = requestParser.toQuery(jsonQuery) - val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) - val fetchFuture = graph.getEdges(q) - val excludeFuture = graph.getEdges(filterOutQuery) + case JsArray(arr) => + val queries = arr.map(requestParser.toQuery(_, impIdOpt)) + val weights = queries.map(_ => 1.0) + val multiQuery = MultiQuery(queries, weights, QueryOption(), jsonQuery) + graph.getEdgesMultiQuery(multiQuery).map(post(graph, multiQuery.queryOption, _)) - for { - queryResultLs <- fetchFuture - exclude <- excludeFuture - } yield { - post(queryResultLs, exclude) + case _ => throw BadQueryException("Cannot support") } } private def getVertices(jsValue: JsValue) = { val jsonQuery = jsValue - val ts = System.currentTimeMillis() - val props = "{}" val vertices = jsonQuery.as[List[JsValue]].flatMap { js => val serviceName = (js \ "serviceName").as[String] val columnName = (js \ "columnName").as[String] - for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield { - Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props) + for { + idJson <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue]) + id <- jsValueToAny(idJson) + } yield { + Vertex.toVertex(serviceName, columnName, id) } } graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) } } + private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = { var body = bucket.requestBody.replace("#uuid", uuid) - // // replace variable - // body = TemplateHelper.replaceVariable(System.currentTimeMillis(), body) - - // replace param for { requestKeyJson <- requestKeyJsonOpt jsObj <- requestKeyJson.asOpt[JsObject] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index eaa25af..a6e81b4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -19,12 +19,10 @@ package org.apache.s2graph.core.storage -import java.util.concurrent.{TimeUnit, Executors} +import java.util.concurrent.{Executors, TimeUnit} import com.typesafe.config.Config import org.apache.hadoop.hbase.util.Bytes -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.s2graph.core.ExceptionHandler.{KafkaMessage, Key, Val} import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} @@ -37,13 +35,15 @@ import org.apache.s2graph.core.utils.{Extensions, logger} import scala.annotation.tailrec import scala.collection.Seq import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{Promise, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Random, Try} -abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { +abstract class Storage[R](val graph: Graph, + val config: Config)(implicit ec: ExecutionContext) { import HBaseType._ /** storage dependent configurations */ + val DeleteAllFetchCount = config.getInt("delete.all.fetch.count") val MaxRetryNum = config.getInt("max.retry.number") val MaxBackOff = config.getInt("max.back.off") val BackoffTimeout = config.getInt("back.off.timeout") @@ -57,11 +57,15 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { /** retry scheduler */ val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor() + /** handle mutate failed */ val exceptionHandler = new ExceptionHandler(config) - val failTopic = s"mutateFailed_${config.getString("phase")}" + /** fallback */ + val fallback = Future.successful(StepResult.Empty) + val innerFallback = Future.successful(StepInnerResult.Empty) + /** * Compatibility table * | label schema version | snapshot edge | index edge | vertex | note | @@ -229,7 +233,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { * @return */ def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[QueryRequestWithResult]] + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepInnerResult]] /** * fetch Vertex for given request from storage. @@ -324,7 +328,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = { val (strongEdges, weakEdges) = - edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")) + (edges.partition(e => e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk"))) val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map { case (zkQuorum, edges) => val mutations = edges.flatMap { edge => @@ -449,7 +453,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { future recoverWith { case FetchTimeoutException(retryEdge) => logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") - /* fetch failed. re-fetch should be done */ + /** fetch failed. re-fetch should be done */ fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) } @@ -465,14 +469,14 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { } logger.info(s"[Try: $tryNum], [Status: $status] partial fail.\n${retryEdge.toLogString}\nFailReason: ${faileReason}") - /* retry logic */ + /** retry logic */ val promise = Promise[Boolean] val backOff = exponentialBackOff(tryNum) scheduledThreadPool.schedule(new Runnable { override def run(): Unit = { val future = if (failedStatusCode == 0) { // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge. - /* fetch failed. re-fetch should be done */ + /** fetch failed. re-fetch should be done */ fetchSnapshotEdge(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) } @@ -505,7 +509,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { case 0 => fetchedSnapshotEdgeOpt match { case None => - /* + /** * no one has never mutated this SN. * (squashedEdge, edgeMutate) = Edge.buildOperation(None, edges) * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = squashedEdge.ts + 1) @@ -527,7 +531,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { case Some(snapshotEdge) => snapshotEdge.pendingEdgeOpt match { case None => - /* + /** * others finished commit on this SN. but there is no contention. * (squashedEdge, edgeMutate) = Edge.buildOperation(snapshotEdgeOpt, edges) * pendingE = squashedEdge.copy(statusCode = 1, lockTs = now, version = snapshotEdge.version + 1) ? @@ -549,7 +553,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { case Some(pendingEdge) => val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis() if (isLockExpired) { - /* + /** * if pendingEdge.ts == snapshotEdge.ts => * (squashedEdge, edgeMutate) = Edge.buildOperation(None, Seq(pendingEdge)) * else => @@ -571,7 +575,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) } else { - /* + /** * others finished commit on this SN and there is currently contention. * this can't be proceed so retry from re-fetch. * throw EX @@ -584,11 +588,11 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { } case _ => - /* + /** * statusCode > 0 which means self locked and there has been partial failure either on mutate, increment, releaseLock */ - /* + /** * this succeed to lock this SN. keep doing on commit process. * if SN.isEmpty => * no one never succed to commit on this SN. @@ -828,90 +832,96 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { /** Delete All */ - protected def deleteAllFetchedEdgesAsyncOld(queryRequest: QueryRequest, - queryResult: QueryResult, + protected def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepInnerResult, requestTs: Long, retryNum: Int): Future[Boolean] = { - val queryParam = queryRequest.queryParam - val zkQuorum = queryParam.label.hbaseZkAddr - val futures = for { - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - } yield { - /* reverted direction */ - val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - buildIncrementsAsync(indexEdge, -1L) - } - val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge => - indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - buildIncrementsAsync(indexEdge, -1L) + if (stepInnerResult.isEmpty) Future.successful(true) + else { + val head = stepInnerResult.edgesWithScoreLs.head + val zkQuorum = head.edge.label.hbaseZkAddr + val futures = for { + edgeWithScore <- stepInnerResult.edgesWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + } yield { + /** reverted direction */ + val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge => + indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + buildIncrementsAsync(indexEdge, -1L) + } + val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge => + indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + buildIncrementsAsync(indexEdge, -1L) + } + val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations + writeToStorage(zkQuorum, mutations, withWait = true) } - val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations - writeToStorage(zkQuorum, mutations, withWait = true) - } - Future.sequence(futures).map { rets => rets.forall(identity) } + Future.sequence(futures).map { rets => rets.forall(identity) } + } } - protected def buildEdgesToDelete(queryRequestWithResultLs: QueryRequestWithResult, requestTs: Long): QueryResult = { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs).get - val edgeWithScoreLs = queryResult.edgeWithScoreLs.filter { edgeWithScore => + protected def buildEdgesToDelete(stepInnerResult: StepInnerResult, requestTs: Long): StepInnerResult = { + val filtered = stepInnerResult.edgesWithScoreLs.filter { edgeWithScore => (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree - }.map { edgeWithScore => - val label = queryRequest.queryParam.label - val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { - case "strong" => - val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++ - Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion)) - (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) - case _ => - val oldEdge = edgeWithScore.edge - (oldEdge.op, oldEdge.version, oldEdge.propsWithTs) - } + } + if (filtered.isEmpty) StepInnerResult.Empty + else { + val head = filtered.head + val label = head.edge.label + val edgeWithScoreLs = filtered.map { edgeWithScore => + val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { + case "strong" => + val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++ + Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion)) + (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) + case _ => + val oldEdge = edgeWithScore.edge + (oldEdge.op, oldEdge.version, oldEdge.propsWithTs) + } - val copiedEdge = - edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) + val copiedEdge = + edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) - val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) -// logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") - edgeToDelete + val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) + // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") + edgeToDelete + } + //Degree edge? + StepInnerResult(edgeWithScoreLs, Nil, false) } - - queryResult.copy(edgeWithScoreLs = edgeWithScoreLs) } - protected def deleteAllFetchedEdgesLs(queryRequestWithResultLs: Seq[QueryRequestWithResult], requestTs: Long): Future[(Boolean, Boolean)] = { - val queryResultLs = queryRequestWithResultLs.map(_.queryResult) - queryResultLs.foreach { queryResult => - if (queryResult.isFailure) throw new RuntimeException("fetched result is fallback.") + protected def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepInnerResult], + requestTs: Long): Future[(Boolean, Boolean)] = { + stepInnerResultLs.foreach { stepInnerResult => + if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.") } val futures = for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get - deleteQueryResult = buildEdgesToDelete(queryRequestWithResult, requestTs) - if deleteQueryResult.edgeWithScoreLs.nonEmpty + stepInnerResult <- stepInnerResultLs + deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs) + if deleteStepInnerResult.edgesWithScoreLs.nonEmpty } yield { - val label = queryRequest.queryParam.label + val head = deleteStepInnerResult.edgesWithScoreLs.head + val label = head.edge.label label.schemaVersion match { case HBaseType.VERSION3 | HBaseType.VERSION4 => if (label.consistencyLevel == "strong") { - /* + /** * read: snapshotEdge on queryResult = O(N) * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) */ - mutateEdges(deleteQueryResult.edgeWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity)) + mutateEdges(deleteStepInnerResult.edgesWithScoreLs.map(_.edge), withWait = true).map(_.forall(identity)) } else { - deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum) + deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) } case _ => - /* + /** * read: x * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) */ - deleteAllFetchedEdgesAsyncOld(queryRequest, deleteQueryResult, requestTs, MaxRetryNum) + deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) } } @@ -923,10 +933,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { } } - protected def fetchAndDeleteAll(query: Query, requestTs: Long): Future[(Boolean, Boolean)] = { + protected def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = { val future = for { - queryRequestWithResultLs <- getEdges(query) - (allDeleted, ret) <- deleteAllFetchedEdgesLs(queryRequestWithResultLs, requestTs) + stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_))) + (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs) } yield { // logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}") (allDeleted, ret) @@ -960,19 +970,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { } val requestTs = ts - val queryParams = for { + /** create query per label */ + val queries = for { label <- labels } yield { val labelWithDir = LabelWithDirection(label.id.get, dir) - QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw)) + val queryParam = QueryParam(labelWithDir).limit(0, DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw)) + val step = Step(List(queryParam)) + Query(srcVertices, Vector(step)) } - val step = Step(queryParams.toList) - val q = Query(srcVertices, Vector(step)) - // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) { - val retryFuture = Extensions.retryOnSuccess(MaxRetryNum) { - fetchAndDeleteAll(q, requestTs) + val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) { + fetchAndDeleteAll(queries, requestTs) } { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess } @@ -1039,7 +1049,9 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { queryParam: QueryParam, prevScore: Double = 1.0, isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { + parentEdges: Seq[EdgeWithScore], + startOffset: Int = 0, + len: Int = Int.MaxValue): Seq[EdgeWithScore] = { if (kvs.isEmpty) Seq.empty else { val first = kvs.head @@ -1050,7 +1062,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None) for { - kv <- kvs + (kv, idx) <- kvs.zipWithIndex if idx >= startOffset && idx < startOffset + len edge <- if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryParam, None, isInnerCall, parentEdges) else toEdge(kv, queryParam, cacheElementOpt, parentEdges) @@ -1071,19 +1083,6 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { /** End Of Parse Logic */ -// /** methods for consistency */ -// protected def writeAsyncSimple(zkQuorum: String, elementRpcs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = { -// if (elementRpcs.isEmpty) { -// Future.successful(true) -// } else { -// val futures = elementRpcs.map { rpc => writeToStorage(rpc, withWait) } -// Future.sequence(futures).map(_.forall(identity)) -// } -// } - - - // def futureCache[T] = Cache[Long, (Long, T)] - protected def toRequestEdge(queryRequest: QueryRequest): Edge = { val srcVertex = queryRequest.vertex // val tgtVertexOpt = queryRequest.tgtVertexOpt @@ -1095,7 +1094,7 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir) val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match { case Some(tgtVertexId) => // _to is given. - /* we use toSnapshotEdge so dont need to swap src, tgt */ + /** we use toSnapshotEdge so dont need to swap src, tgt */ val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion) (src, tgt) @@ -1135,27 +1134,26 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { } } - protected def fetchStep(orgQuery: Query, queryRequestWithResultsLs: Seq[QueryRequestWithResult]): Future[Seq[QueryRequestWithResult]] = { - if (queryRequestWithResultsLs.isEmpty) Future.successful(Nil) + protected def fetchStep(orgQuery: Query, + stepIdx: Int, + stepInnerResult: StepInnerResult): Future[StepInnerResult] = { + if (stepInnerResult.isEmpty) Future.successful(StepInnerResult.Empty) else { - val queryRequest = queryRequestWithResultsLs.head.queryRequest - val q = orgQuery - val queryResultsLs = queryRequestWithResultsLs.map(_.queryResult) + val edgeWithScoreLs = stepInnerResult.edgesWithScoreLs - val stepIdx = queryRequest.stepIdx + 1 + val q = orgQuery val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None val prevStepThreshold = prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold) val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1) val step = q.steps(stepIdx) + val alreadyVisited = if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean] - else Graph.alreadyVisitedVertices(queryResultsLs) + else Graph.alreadyVisitedVertices(stepInnerResult.edgesWithScoreLs) - val groupedBy = queryResultsLs.flatMap { queryResult => - queryResult.edgeWithScoreLs.map { case edgeWithScore => - edgeWithScore.edge.tgtVertex -> edgeWithScore - } + val groupedBy = edgeWithScoreLs.map { case edgeWithScore => + edgeWithScore.edge.tgtVertex -> edgeWithScore }.groupBy { case (vertex, edgeWithScore) => vertex } val groupedByFiltered = for { @@ -1178,39 +1176,48 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { queryParam <- step.queryParams } yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore) - Graph.filterEdges(fetches(queryRequests, prevStepTgtVertexIdEdges), alreadyVisited)(ec) + val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) + Graph.filterEdges(orgQuery, stepIdx, queryRequests.map(_._1), fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited)(ec) } } - - protected def fetchStepFuture(orgQuery: Query, queryRequestWithResultLsFuture: Future[Seq[QueryRequestWithResult]]): Future[Seq[QueryRequestWithResult]] = { - for { - queryRequestWithResultLs <- queryRequestWithResultLsFuture - ret <- fetchStep(orgQuery, queryRequestWithResultLs) - } yield ret + private def getEdgesStepInner(q: Query): Future[StepInnerResult] = { + Try { + if (q.steps.isEmpty) innerFallback + else { + // current stepIdx = -1 + val startStepInnerResult = QueryResult.fromVertices(q) + q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) => + for { + prevStepInnerResult <- prevStepInnerResultFuture + currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult) + } yield currentStepInnerResult + } + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + innerFallback + } get } - - def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = { - val fallback = { - val queryRequest = QueryRequest(query = q, stepIdx = 0, q.vertices.head, queryParam = QueryParam.Empty) - Future.successful(q.vertices.map(v => QueryRequestWithResult(queryRequest, QueryResult()))) - } + def getEdges(q: Query): Future[StepResult] = { Try { - if (q.steps.isEmpty) { // TODO: this should be get vertex query. fallback } else { - // current stepIdx = -1 - val startQueryResultLs = QueryResult.fromVertices(q) - q.steps.foldLeft(Future.successful(startQueryResultLs)) { case (acc, step) => - fetchStepFuture(q, acc) -// fetchStepFuture(q, acc).map { stepResults => -// step.queryParams.zip(stepResults).foreach { case (qParam, queryRequestWithResult) => -// val cursor = Base64.getEncoder.encodeToString(queryRequestWithResult.queryResult.tailCursor) -// qParam.cursorOpt = Option(cursor) -// } -// stepResults -// } + val filterOutFuture = q.queryOption.filterOutQuery match { + case None => innerFallback + case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) + } + for { + innerResult <- getEdgesStepInner(q) + filterOutInnerResult <- filterOutFuture + } yield { + val result = StepResult(graph, q.queryOption, innerResult) + if (filterOutInnerResult.isEmpty) result + else { + StepResult.filterOut(graph, q.queryOption, result, filterOutInnerResult) + } } } } recover { @@ -1220,7 +1227,34 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { } get } - def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = { + def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = { + val fallback = Future.successful(StepResult.Empty) + + Try { + if (mq.queries.isEmpty) fallback + else { + val filterOutFuture = mq.queryOption.filterOutQuery match { + case None => innerFallback + case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery) + } + + val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) }) + for { + multiQueryResults <- multiQueryFutures + filterOutInnerResult <- filterOutFuture + } yield { + val merged = StepResult.merges(mq.queryOption, multiQueryResults, mq.weights) + StepResult.filterOut(graph, mq.queryOption, merged, filterOutInnerResult) + } + } + } recover { + case e: Exception => + logger.error(s"getEdgesAsync: $e", e) + fallback + } get + } + + def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[StepResult] = { val ts = System.currentTimeMillis() val futures = for { (srcVertex, tgtVertex, queryParam) <- params @@ -1228,15 +1262,18 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = propsWithTs) } yield { fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) => - val _queryParam = queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId)) - val q = Query.toQuery(Seq(edge.srcVertex), _queryParam) - val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam) - val queryResult = QueryResult(edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0))) - QueryRequestWithResult(queryRequest, queryResult) + edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0)) } } - Future.sequence(futures) + Future.sequence(futures).map { edgeWithScoreLs => + val s2EdgeWithScoreLs = edgeWithScoreLs.flatMap { ls => + ls.map { edgeWithScore => + S2EdgeWithScore(edgeWithScore.edge, edgeWithScore.score) + } + } + StepResult(results = s2EdgeWithScoreLs, grouped = Nil, degreeEdges = Nil) + } } @@ -1266,6 +1303,13 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { } } + + protected def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = { + val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score } + edgeWithScores.map { edgeWithScore => + edgeWithScore.copy(score = edgeWithScore.score / sum) + } + } /** end of query */ /** Mutation Builder */ @@ -1290,19 +1334,19 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match { case (true, true) => - /* when there is no need to update. shouldUpdate == false */ + /** when there is no need to update. shouldUpdate == false */ List.empty case (true, false) => - /* no edges to delete but there is new edges to insert so increase degree by 1 */ + /** no edges to delete but there is new edges to insert so increase degree by 1 */ edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) } case (false, true) => - /* no edges to insert but there is old edges to delete so decrease degree by 1 */ + /** no edges to insert but there is old edges to delete so decrease degree by 1 */ edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) } case (false, false) => - /* update on existing edges so no change on degree */ + /** update on existing edges so no change on degree */ List.empty } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 138216b..b52ba53 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -82,8 +82,9 @@ object AsynchbaseStorage { } -class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionContext) - extends Storage[Deferred[QueryRequestWithResult]](config) { +class AsynchbaseStorage(override val graph: Graph, + override val config: Config)(implicit ec: ExecutionContext) + extends Storage[Deferred[StepInnerResult]](graph, config) { import Extensions.DeferOps @@ -100,14 +101,16 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte private val emptyKeyValues = new util.ArrayList[KeyValue]() private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client - import CanDefer._ - /** Future Cache to squash request */ - private val futureCache = new DeferCache[QueryResult, Deferred, Deferred](config, QueryResult(), "FutureCache", useMetric = true) + private val futureCache = new DeferCache[StepInnerResult, Deferred, Deferred](config, StepInnerResult.Empty, "FutureCache", useMetric = true) /** Simple Vertex Cache */ private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue]) + private val zkQuorum = config.getString("hbase.zookeeper.quorum") + private val zkQuorumSlave = + if (config.hasPath("hbase.zookeeper.quorum")) Option(config.getString("hbase.zookeeper.quorum")) + else None /** * fire rpcs into proper hbase cluster using client and @@ -241,7 +244,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam") scanner.setMaxVersions(1) - scanner.setMaxNumRows(queryParam.limit) + scanner.setMaxNumRows(queryParam.offset + queryParam.limit) scanner.setMaxTimestamp(maxTs) scanner.setMinTimestamp(minTs) scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis) @@ -280,21 +283,38 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte override def fetch(queryRequest: QueryRequest, prevStepScore: Double, isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = { + parentEdges: Seq[EdgeWithScore]): Deferred[StepInnerResult] = { + + def fetchInner(hbaseRpc: AnyRef): Deferred[StepInnerResult] = { + val queryParam = queryRequest.queryParam - def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = { fetchKeyValuesInner(hbaseRpc).withCallback { kvs => - val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges) - val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) { - sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample) - } else edgeWithScores - QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte])) -// QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty))) + val (startOffset, length) = queryParam.label.schemaVersion match { + case HBaseType.VERSION4 => (queryParam.offset, queryParam.limit) + case _ => (0, kvs.length) + } + + val edgeWithScores = toEdges(kvs, queryParam, prevStepScore, isInnerCall, parentEdges, startOffset, length) + if (edgeWithScores.isEmpty) StepInnerResult.Empty + else { + val head = edgeWithScores.head + val (degreeEdges, indexEdges) = + if (head.edge.isDegree) (Seq(head), edgeWithScores.tail) + else (Nil, edgeWithScores) + val normalized = + if (queryRequest.queryParam.shouldNormalize) normalize(indexEdges) + else indexEdges + + val sampled = if (queryRequest.queryParam.sample >= 0) { + sample(queryRequest, normalized, queryRequest.queryParam.sample) + } else normalized + + StepInnerResult(edgesWithScoreLs = sampled, degreeEdges) + } } recoverWith { ex => logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) - QueryResult(isFailure = true) -// QueryRequestWithResult(queryRequest, QueryResult(isFailure = true)) + StepInnerResult.Failure } } @@ -302,27 +322,25 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte val cacheTTL = queryParam.cacheTTLInMillis val request = buildRequest(queryRequest) - val defer = - if (cacheTTL <= 0) fetchInner(request) - else { - val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) - val cacheKey = queryParam.toCacheKey(cacheKeyBytes) - futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) + if (cacheTTL <= 0) fetchInner(request) + else { + val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) + val cacheKey = queryParam.toCacheKey(cacheKeyBytes) + futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) } - defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)} } override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, Double)], - prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[QueryRequestWithResult]] = { - val defers: Seq[Deferred[QueryRequestWithResult]] = for { + prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[StepInnerResult]] = { + val defers: Seq[Deferred[StepInnerResult]] = for { (queryRequest, prevStepScore) <- queryRequestWithScoreLs parentEdges <- prevStepEdges.get(queryRequest.vertex.id) } yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges) - val grouped: Deferred[util.ArrayList[QueryRequestWithResult]] = Deferred.group(defers) + val grouped: Deferred[util.ArrayList[StepInnerResult]] = Deferred.group(defers) grouped withCallback { - queryResults: util.ArrayList[QueryRequestWithResult] => + queryResults: util.ArrayList[StepInnerResult] => queryResults.toIndexedSeq } toFuture } @@ -371,47 +389,56 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte } - override def createTable(zkAddr: String, + override def createTable(_zkAddr: String, tableName: String, cfs: List[String], regionMultiplier: Int, ttl: Option[Int], compressionAlgorithm: String): Unit = { - logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") - val admin = getAdmin(zkAddr) - val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier - try { - if (!admin.tableExists(TableName.valueOf(tableName))) { - try { - val desc = new HTableDescriptor(TableName.valueOf(tableName)) - desc.setDurability(Durability.ASYNC_WAL) - for (cf <- cfs) { - val columnDesc = new HColumnDescriptor(cf) - .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) - .setBloomFilterType(BloomType.ROW) - .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) - .setMaxVersions(1) - .setTimeToLive(2147483647) - .setMinVersions(0) - .setBlocksize(32768) - .setBlockCacheEnabled(true) - if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) - desc.addFamily(columnDesc) - } + /** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */ + for { + zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq + } { + logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") + val admin = getAdmin(zkAddr) + val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier + try { + if (!admin.tableExists(TableName.valueOf(tableName))) { + try { + val desc = new HTableDescriptor(TableName.valueOf(tableName)) + desc.setDurability(Durability.ASYNC_WAL) + for (cf <- cfs) { + val columnDesc = new HColumnDescriptor(cf) + .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) + .setBloomFilterType(BloomType.ROW) + .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) + .setMaxVersions(1) + .setTimeToLive(2147483647) + .setMinVersions(0) + .setBlocksize(32768) + .setBlockCacheEnabled(true) + if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) + desc.addFamily(columnDesc) + } - if (regionCount <= 1) admin.createTable(desc) - else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) - } catch { - case e: Throwable => - logger.error(s"$zkAddr, $tableName failed with $e", e) - throw e + if (regionCount <= 1) admin.createTable(desc) + else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) + } catch { + case e: Throwable => + logger.error(s"$zkAddr, $tableName failed with $e", e) + throw e + } + } else { + logger.info(s"$zkAddr, $tableName, $cfs already exist.") } - } else { - logger.info(s"$zkAddr, $tableName, $cfs already exist.") + } catch { + case e: Throwable => + logger.error(s"$zkAddr, $tableName failed with $e", e) + throw e + } finally { + admin.close() + admin.getConnection.close() } - } finally { - admin.close() - admin.getConnection.close() } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala index 83d4338..c700e53 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -63,5 +63,5 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge else if (indexEdge.op == GraphUtil.operations("incrementCount")) Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) - + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala index 4149540..b402c0f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala @@ -51,6 +51,8 @@ object logger { private val logger = LoggerFactory.getLogger("application") private val errorLogger = LoggerFactory.getLogger("error") private val metricLogger = LoggerFactory.getLogger("metrics") + private val queryLogger = LoggerFactory.getLogger("query") + private val malformedLogger = LoggerFactory.getLogger("malformed") def metric[T: Loggable](msg: => T) = metricLogger.info(implicitly[Loggable[T]].toLogMessage(msg)) @@ -61,6 +63,10 @@ object logger { def error[T: Loggable](msg: => T, exception: => Throwable) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception) def error[T: Loggable](msg: => T) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg)) + + def query[T: Loggable](msg: => T) = queryLogger.info(implicitly[Loggable[T]].toLogMessage(msg)) + + def malformed[T: Loggable](msg: => T, exception: => Throwable) = malformedLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8dbb9a3e/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala index a018c01..6933320 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala @@ -19,10 +19,12 @@ package org.apache.s2graph.core +import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.mysqls.LabelMeta import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId} import org.apache.s2graph.core.utils.logger import org.scalatest.FunSuite +import play.api.libs.json.{JsObject, Json} class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { initTests() @@ -39,7 +41,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { val (srcId, tgtId, labelName) = ("1", "2", testLabelName) val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield { - Management.toEdge(ts.toLong, op, srcId, tgtId, labelName, "out", props).toLogString + val properties = fromJsonToProperties(Json.parse(props).as[JsObject]) + Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, op).toLogString }).mkString("\n") val expected = Seq(
