http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala new file mode 100644 index 0000000..46b3255 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala @@ -0,0 +1,553 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.GraphExceptions.BadQueryException +import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, ServiceColumn} +import org.apache.s2graph.core.types.{InnerVal, InnerValLike} +import play.api.libs.json.{Json, _} + +import scala.collection.mutable.ListBuffer + +object PostProcess extends JSONParser { + + + type EDGE_VALUES = Map[String, JsValue] + type ORDER_BY_VALUES = (Any, Any, Any, Any) + type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES) + + /** + * Result Entity score field name + */ + val emptyDegrees = Seq.empty[JsValue] + val timeoutResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isTimeout" -> true) + val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isEmpty" -> true) + def badRequestResults(ex: => Exception) = ex match { + case ex: BadQueryException => Json.obj("message" -> ex.msg) + case _ => Json.obj("message" -> ex.getMessage) + } + + val SCORE_FIELD_NAME = "scoreSum" + val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props") + + def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { + val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap + // filterNot {case (edge, score) => edge.props.contains(LabelMeta.degreeSeq)} + val groupedEdgesWithRank = (for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields)) + } yield { + (queryRequest.queryParam, edge, score) + }).groupBy { + case (queryParam, edge, rank) if edge.labelWithDir.dir == GraphUtil.directions("in") => + (queryParam.label.srcColumn, queryParam.label.label, queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree) + case (queryParam, edge, rank) => + (queryParam.label.tgtColumn, queryParam.label.label, queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree) + } + + val ret = for { + ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) <- groupedEdgesWithRank if !isDegreeEdge + edgesWithRanks = edgesAndRanks.groupBy(x => x._2.srcVertex).map(_._2.head) + id <- innerValToJsValue(target, tgtColumn.columnType) + } yield { + Json.obj("name" -> tgtColumn.columnName, "id" -> id, + SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum, + "label" -> labelName, + "aggr" -> Json.obj( + "name" -> srcColumn.columnName, + "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) => + innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType) + }, + "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) => + Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType), + "props" -> propsToJson(edge), + "score" -> rank + ) + } + ) + ) + } + + ret.toList + } + + def sortWithFormatted(jsons: Seq[JsObject], + scoreField: String = "scoreSum", + queryRequestWithResultLs: Seq[QueryRequestWithResult], + decrease: Boolean = true): JsObject = { + val ordering = if (decrease) -1 else 1 + val sortedJsons = jsons.sortBy { jsObject => (jsObject \ scoreField).as[Double] * ordering } + + if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons) + else Json.obj( + "size" -> sortedJsons.size, + "results" -> sortedJsons, + "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId() + ) + } + + private def toHashKey(edge: Edge, queryParam: QueryParam, fields: Seq[String], delimiter: String = ","): Int = { + val ls = for { + field <- fields + } yield { + field match { + case "from" | "_from" => edge.srcVertex.innerId.toIdString() + case "to" | "_to" => edge.tgtVertex.innerId.toIdString() + case "label" => edge.labelWithDir.labelId + case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir)) + case "_timestamp" | "timestamp" => edge.ts + case _ => + queryParam.label.metaPropsInvMap.get(field) match { + case None => throw new RuntimeException(s"unknow column: $field") + case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match { + case None => labelMeta.defaultValue + case Some(propVal) => propVal + } + } + } + } + val ret = ls.hashCode() + ret + } + + def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], isSrcVertex: Boolean = false): Seq[Int] = { + for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + q = queryRequest.query + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + } yield toHashKey(edge, queryRequest.queryParam, q.filterOutFields) + } + + def summarizeWithListExcludeFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { + val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) + sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs, decrease = true) + } + + def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { + val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) + sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs) + } + + def summarizeWithListFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { + val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) + sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs) + } + + def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult]): JsValue = { + toSimpleVertexArrJson(queryRequestWithResultLs, Seq.empty[QueryRequestWithResult]) + } + + private def orderBy(queryOption: QueryOption, + rawEdges: ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]): ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)] = { + import OrderingUtil._ + + if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) { + val ascendingLs = queryOption.orderByColumns.map(_._2) + rawEdges.sortBy(_._3)(TupleMultiOrdering[Any](ascendingLs)) + } else { + rawEdges + } + } + + private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, edge: Edge, column: String): Any = { + column match { + case "score" => score + case "timestamp" | "_timestamp" => edge.ts + case _ => + keyWithJs.get(column) match { + case None => keyWithJs.get("props").map { js => (js \ column).as[JsValue] }.get + case Some(x) => x + } + } + } + + private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): JsValue = { + def traverse(js: JsValue): JsValue = js match { + case JsNull => mapper(JsNull) + case JsUndefined() => mapper(JsUndefined("")) + case JsNumber(v) => mapper(js) + case JsString(v) => mapper(js) + case JsBoolean(v) => mapper(js) + case JsArray(elements) => JsArray(elements.map { t => traverse(mapper(t)) }) + case JsObject(values) => JsObject(values.map { case (k, v) => k -> traverse(mapper(v)) }) + } + + traverse(jsValue) + } + + /** test query with filterOut is not working since it can not diffrentate filterOut */ + private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): JsValue = { + val cursors = _cursors.flatten.iterator + + buildReplaceJson(jsonQuery) { + case js@JsObject(fields) => + val isStep = fields.find { case (k, _) => k == "label" } // find label group + if (isStep.isEmpty) js + else { + // TODO: Order not ensured + val withCursor = js.fieldSet | Set("cursor" -> JsString(cursors.next)) + JsObject(withCursor.toSeq) + } + case js => js + } + } + + private def buildRawEdges(queryOption: QueryOption, + queryRequestWithResultLs: Seq[QueryRequestWithResult], + excludeIds: Map[Int, Boolean], + scoreWeight: Double = 1.0): (ListBuffer[JsValue], ListBuffer[RAW_EDGE]) = { + val degrees = ListBuffer[JsValue]() + val rawEdges = ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]() + val metaPropNamesMap = scala.collection.mutable.Map[String, Int]() + for { + queryRequestWithResult <- queryRequestWithResultLs + } yield { + val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get + queryRequest.queryParam.label.metaPropNames.foreach { metaPropName => + metaPropNamesMap.put(metaPropName, metaPropNamesMap.getOrElse(metaPropName, 0) + 1) + } + } + val propsExistInAll = metaPropNamesMap.filter(kv => kv._2 == queryRequestWithResultLs.length) + val orderByColumns = queryOption.orderByColumns.filter { case (column, _) => + column match { + case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => true + case _ => + propsExistInAll.contains(column) +// //TODO?? +// false +// queryParam.label.metaPropNames.contains(column) + } + } + + /** build result jsons */ + for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + queryParam = queryRequest.queryParam + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + hashKey = toHashKey(edge, queryRequest.queryParam, queryOption.filterOutFields) + if !excludeIds.contains(hashKey) + } { + + // edge to json + val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir) + val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType) + if (edge.isDegree && fromOpt.isDefined) { + if (queryOption.limitOpt.isEmpty) { + degrees += Json.obj( + "from" -> fromOpt.get, + "label" -> queryRequest.queryParam.label.label, + "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir), + LabelMeta.degree.name -> innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG) + ) + } + } else { + val keyWithJs = edgeToJson(edge, score, queryRequest.query, queryRequest.queryParam) + val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match { + case 0 => + (None, None, None, None) + case 1 => + val it = orderByColumns.iterator + val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) + (v1, None, None, None) + case 2 => + val it = orderByColumns.iterator + val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) + (v1, v2, None, None) + case 3 => + val it = orderByColumns.iterator + val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1) + (v1, v2, v3, None) + case _ => + val it = orderByColumns.iterator + val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1) + (v1, v2, v3, v4) + } + + val currentEdge = (keyWithJs, score * scoreWeight, orderByValues) + rawEdges += currentEdge + } + } + (degrees, rawEdges) + } + + private def buildResultJsValue(queryOption: QueryOption, + degrees: ListBuffer[JsValue], + rawEdges: ListBuffer[RAW_EDGE]): JsValue = { + if (queryOption.groupByColumns.isEmpty) { + // ordering + val filteredEdges = rawEdges.filter(t => t._2 >= queryOption.scoreThreshold) + + val edges = queryOption.limitOpt match { + case None => orderBy(queryOption, filteredEdges).map(_._1) + case Some(limit) => orderBy(queryOption, filteredEdges).map(_._1).take(limit) + } + val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees + + Json.obj( + "size" -> edges.size, + "degrees" -> resultDegrees, +// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()), + "results" -> edges + ) + } else { + val grouped = rawEdges.groupBy { case (keyWithJs, _, _) => + val props = keyWithJs.get("props") + + for { + column <- queryOption.groupByColumns + value <- keyWithJs.get(column) match { + case None => props.flatMap { js => (js \ column).asOpt[JsValue] } + case Some(x) => Some(x) + } + } yield column -> value + } + + val groupedEdgesWithScoreSum = + for { + (groupByKeyVals, groupedRawEdges) <- grouped + scoreSum = groupedRawEdges.map(x => x._2).sum if scoreSum >= queryOption.scoreThreshold + } yield { + // ordering + val edges = orderBy(queryOption, groupedRawEdges).map(_._1) + + //TODO: refactor this + val js = if (queryOption.returnAgg) + Json.obj( + "groupBy" -> Json.toJson(groupByKeyVals.toMap), + "scoreSum" -> scoreSum, + "agg" -> edges + ) + else + Json.obj( + "groupBy" -> Json.toJson(groupByKeyVals.toMap), + "scoreSum" -> scoreSum, + "agg" -> Json.arr() + ) + (js, scoreSum) + } + + val groupedSortedJsons = queryOption.limitOpt match { + case None => + groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1) + case Some(limit) => + groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1).take(limit) + } + val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees + Json.obj( + "size" -> groupedSortedJsons.size, + "degrees" -> resultDegrees, +// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()), + "results" -> groupedSortedJsons + ) + } + } + + def toSimpleVertexArrJsonMulti(queryOption: QueryOption, + resultWithExcludeLs: Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])], + excludes: Seq[QueryRequestWithResult]): JsValue = { + val excludeIds = (Seq((Seq.empty, excludes)) ++ resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, excludes)) => + acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap + } + + val (degrees, rawEdges) = (ListBuffer.empty[JsValue], ListBuffer.empty[RAW_EDGE]) + for { + (result, localExclude) <- resultWithExcludeLs + } { + val newResult = result.map { queryRequestWithResult => + val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get + val newQuery = queryRequest.query.copy(queryOption = queryOption) + queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = newQuery)) + } + val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, excludeIds) + degrees ++= _degrees + rawEdges ++= _rawEdges + } + buildResultJsValue(queryOption, degrees, rawEdges) + } + + def toSimpleVertexArrJson(queryOption: QueryOption, + queryRequestWithResultLs: Seq[QueryRequestWithResult], + exclude: Seq[QueryRequestWithResult]): JsValue = { + val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap + val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds) + buildResultJsValue(queryOption, degrees, rawEdges) + } + + + def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult], + exclude: Seq[QueryRequestWithResult]): JsValue = { + + queryRequestWithResultLs.headOption.map { queryRequestWithResult => + val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get + val query = queryRequest.query + val queryOption = query.queryOption + val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap + val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds) + buildResultJsValue(queryOption, degrees, rawEdges) + } getOrElse emptyResults + } + + def verticesToJson(vertices: Iterable[Vertex]) = { + Json.toJson(vertices.flatMap { v => vertexToJson(v) }) + } + + def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, JsValue] = { + for { + (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq) + labelMeta <- queryParam.label.metaPropsMap.get(seq) + jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType) + } yield labelMeta.name -> jsValue + } + + private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, queryParam: QueryParam): JsValue = { + if (parentEdges.isEmpty) { + JsNull + } else { + val parents = for { + parent <- parentEdges + (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get + parentQueryParam = QueryParam(parentEdge.labelWithDir) + parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if parents != JsNull + } yield { + val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge) + val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, parentQueryParam) + ("parents" -> parents) + Json.toJson(edgeJson) + } + + Json.toJson(parents) + } + } + + /** TODO */ + def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = { + val (srcColumn, tgtColumn) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir) + + val kvMapOpt = for { + from <- innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType) + to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType) + } yield { + val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props" + val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam) + val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap + + val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, column) => + val jsValue = column match { + case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - (System.currentTimeMillis() - queryParam.timestamp)) + case "from" => from + case "to" => to + case "label" => JsString(queryParam.label.label) + case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir)) + case "_timestamp" | "timestamp" => JsNumber(edge.ts) + case "score" => JsNumber(score) + case "props" if propsMap.nonEmpty => Json.toJson(propsMap) + case _ => JsNull + } + + if (jsValue == JsNull) map else map + (column -> jsValue) + } + kvMap + } + + kvMapOpt.getOrElse(Map.empty) + } + + def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = { + val kvs = edgeToJsonInner(edge, score, q, queryParam) + if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> Json.toJson(edgeParent(edge.parentEdges, q, queryParam))) + else kvs + } + + def vertexToJson(vertex: Vertex): Option[JsObject] = { + val serviceColumn = ServiceColumn.findById(vertex.id.colId) + + for { + id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType) + } yield { + Json.obj("serviceName" -> serviceColumn.service.serviceName, + "columnName" -> serviceColumn.columnName, + "id" -> id, "props" -> propsToJson(vertex), + "timestamp" -> vertex.ts, + // "belongsTo" -> vertex.belongLabelIds) + "belongsTo" -> vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label))) + } + } + + private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, InnerValLike]) = { + for { + (seq, value) <- props + name <- seqsToNames.get(seq) + } yield (name, value) + } + + private def propsToJson(vertex: Vertex) = { + val serviceColumn = vertex.serviceColumn + val props = for { + (propKey, innerVal) <- vertex.props + columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, propKey.toByte, useCache = true) + jsValue <- innerValToJsValue(innerVal, columnMeta.dataType) + } yield { + (columnMeta.name -> jsValue) + } + props.toMap + } + + @deprecated(message = "deprecated", since = "0.2") + def propsToJson(edge: Edge) = { + for { + (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq) + metaProp <- edge.label.metaPropsMap.get(seq) + jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType) + } yield { + (metaProp.name, jsValue) + } + } + + @deprecated(message = "deprecated", since = "0.2") + def summarizeWithListExclude(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = { + val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap + + + val groupedEdgesWithRank = (for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields)) + } yield { + (edge, score) + }).groupBy { case (edge, score) => + (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId) + } + + val jsons = for { + ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank + (edges, ranks) = edgesAndRanks.groupBy(x => x._1.srcVertex).map(_._2.head).unzip + tgtId <- innerValToJsValue(target, tgtColumn.columnType) + } yield { + Json.obj(tgtColumn.columnName -> tgtId, + s"${srcColumn.columnName}s" -> + edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)), "scoreSum" -> ranks.sum) + } + val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ "scoreSum").as[Double] }.reverse + if (queryRequestWithResultLs.isEmpty) { + Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons) + } else { + Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons, + "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId()) + } + + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala new file mode 100644 index 0000000..2febadd --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -0,0 +1,583 @@ +package org.apache.s2graph.core + +import com.google.common.hash.Hashing +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} +import org.apache.s2graph.core.parsers.{Where, WhereParser} +import org.apache.s2graph.core.types.{HBaseSerializable, InnerVal, InnerValLike, LabelWithDirection} +import org.hbase.async.ColumnRangeFilter +import play.api.libs.json.{JsNull, JsNumber, JsValue, Json} + +import scala.util.hashing.MurmurHash3 +import scala.util.{Success, Try} + +object Query { + val initialScore = 1.0 + lazy val empty = Query() + + def toQuery(srcVertices: Seq[Vertex], queryParam: QueryParam) = Query(srcVertices, Vector(Step(List(queryParam)))) + + object DuplicatePolicy extends Enumeration { + type DuplicatePolicy = Value + val First, Sum, CountSum, Raw = Value + + def apply(policy: String): Value = { + policy match { + case "sum" => Query.DuplicatePolicy.Sum + case "countSum" => Query.DuplicatePolicy.CountSum + case "raw" => Query.DuplicatePolicy.Raw + case _ => DuplicatePolicy.First + } + } + } +} + +case class MultiQuery(queries: Seq[Query], + weights: Seq[Double], + queryOption: QueryOption, + jsonQuery: JsValue = JsNull) + +case class QueryOption(removeCycle: Boolean = false, + selectColumns: Seq[String] = Seq.empty, + groupByColumns: Seq[String] = Seq.empty, + orderByColumns: Seq[(String, Boolean)] = Seq.empty, + filterOutQuery: Option[Query] = None, + filterOutFields: Seq[String] = Seq(LabelMeta.to.name), + withScore: Boolean = true, + returnTree: Boolean = false, + limitOpt: Option[Int] = None, + returnAgg: Boolean = true, + scoreThreshold: Double = Double.MinValue, + returnDegree: Boolean = true) + + +case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], + steps: IndexedSeq[Step] = Vector.empty[Step], + queryOption: QueryOption = QueryOption(), + jsonQuery: JsValue = JsNull) { + + val removeCycle = queryOption.removeCycle + val selectColumns = queryOption.selectColumns +// val groupBy = queryOption.groupBy + val groupByColumns = queryOption.groupByColumns + val orderByColumns = queryOption.orderByColumns + val filterOutQuery = queryOption.filterOutQuery + val filterOutFields = queryOption.filterOutFields + val withScore = queryOption.withScore + val returnTree = queryOption.returnTree + val limitOpt = queryOption.limitOpt + val returnAgg = queryOption.returnAgg + val returnDegree = queryOption.returnDegree + + def cacheKeyBytes: Array[Byte] = { + val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString) + val groupBytes = Bytes.toBytes(queryOption.groupByColumns.toString) + val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString) + val filterOutBytes = queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte]) + val returnTreeBytes = Bytes.toBytes(queryOption.returnTree) + + Seq(selectBytes, groupBytes, orderByBytes, filterOutBytes, returnTreeBytes).foldLeft(Array.empty[Byte])(Bytes.add) + } + + lazy val selectColumnsSet = queryOption.selectColumns.map { c => + if (c == "_from") "from" + else if (c == "_to") "to" + else c + }.toSet + + /** return logical query id without considering parameter values */ + def templateId(): JsValue = { + Json.toJson(for { + step <- steps + queryParam <- step.queryParams.sortBy(_.labelWithDir.labelId) + } yield { + Json.obj("label" -> queryParam.label.label, "direction" -> GraphUtil.fromDirection(queryParam.labelWithDir.dir)) + }) + } + + def impressionId(): JsNumber = { + val hash = MurmurHash3.stringHash(templateId().toString()) + JsNumber(hash) + } + + def cursorStrings(): Seq[Seq[String]] = { + //Don`t know how to replace all cursor keys in json + steps.map { step => + step.queryParams.map { queryParam => + queryParam.cursorOpt.getOrElse("") + } + } + } +} + +object EdgeTransformer { + val DefaultTransformField = Json.arr("_to") + val DefaultTransformFieldAsList = Json.arr("_to").as[List[String]] + val DefaultJson = Json.arr(DefaultTransformField) +} + +/** + * TODO: step wise outputFields should be used with nextStepLimit, nextStepThreshold. + * @param jsValue + */ +case class EdgeTransformer(queryParam: QueryParam, jsValue: JsValue) { + val Delimiter = "\\$" + val targets = jsValue.asOpt[List[Vector[String]]].toList + val fieldsLs = for { + target <- targets + fields <- target + } yield fields + val isDefault = fieldsLs.size == 1 && fieldsLs.head.size == 1 && (fieldsLs.head.head == "_to" || fieldsLs.head.head == "to") + + def toHashKeyBytes: Array[Byte] = if (isDefault) Array.empty[Byte] else Bytes.toBytes(jsValue.toString) + + def replace(fmt: String, + values: Seq[InnerValLike], + nextStepOpt: Option[Step]): Seq[InnerValLike] = { + + val tokens = fmt.split(Delimiter) + val _values = values.padTo(tokens.length, InnerVal.withStr("", queryParam.label.schemaVersion)) + val mergedStr = tokens.zip(_values).map { case (prefix, innerVal) => prefix + innerVal.toString }.mkString + // logger.error(s"${tokens.toList}, ${values}, $mergedStr") + // println(s"${tokens.toList}, ${values}, $mergedStr") + nextStepOpt match { + case None => + val columnType = + if (queryParam.labelWithDir.dir == GraphUtil.directions("out")) queryParam.label.tgtColumnType + else queryParam.label.srcColumnType + + if (columnType == InnerVal.STRING) Seq(InnerVal.withStr(mergedStr, queryParam.label.schemaVersion)) + else Nil + case Some(nextStep) => + val nextQueryParamsValid = nextStep.queryParams.filter { qParam => + if (qParam.labelWithDir.dir == GraphUtil.directions("out")) qParam.label.srcColumnType == "string" + else qParam.label.tgtColumnType == "string" + } + for { + nextQueryParam <- nextQueryParamsValid + } yield { + InnerVal.withStr(mergedStr, nextQueryParam.label.schemaVersion) + } + } + } + + def toInnerValOpt(edge: Edge, fieldName: String): Option[InnerValLike] = { + fieldName match { + case LabelMeta.to.name => Option(edge.tgtVertex.innerId) + case LabelMeta.from.name => Option(edge.srcVertex.innerId) + case _ => + for { + labelMeta <- queryParam.label.metaPropsInvMap.get(fieldName) + value <- edge.propsWithTs.get(labelMeta.seq) + } yield value.innerVal + } + } + + def transform(edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = { + if (isDefault) Seq(edge) + else { + val edges = for { + fields <- fieldsLs + innerVal <- { + if (fields.size == 1) { + val fieldName = fields.head + toInnerValOpt(edge, fieldName).toSeq + } else { + val fmt +: fieldNames = fields + replace(fmt, fieldNames.flatMap(fieldName => toInnerValOpt(edge, fieldName)), nextStepOpt) + } + } + } yield edge.updateTgtVertex(innerVal).copy(originalEdgeOpt = Option(edge)) + + + edges + } + } +} + +object Step { + val Delimiter = "|" +} + +case class Step(queryParams: List[QueryParam], + labelWeights: Map[Int, Double] = Map.empty, + // scoreThreshold: Double = 0.0, + nextStepScoreThreshold: Double = 0.0, + nextStepLimit: Int = -1, + cacheTTL: Long = -1) { + + lazy val excludes = queryParams.filter(_.exclude) + lazy val includes = queryParams.filterNot(_.exclude) + lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap + + def toCacheKey(lss: Seq[Long]): Long = Hashing.murmur3_128().hashBytes(toCacheKeyRaw(lss)).asLong() +// MurmurHash3.bytesHash(toCacheKeyRaw(lss)) + + def toCacheKeyRaw(lss: Seq[Long]): Array[Byte] = { + var bytes = Array.empty[Byte] + lss.sorted.foreach { h => bytes = Bytes.add(bytes, Bytes.toBytes(h)) } + bytes + } +} + +case class VertexParam(vertices: Seq[Vertex]) { + var filters: Option[Map[Byte, InnerValLike]] = None + + def has(what: Option[Map[Byte, InnerValLike]]): VertexParam = { + what match { + case None => this + case Some(w) => has(w) + } + } + + def has(what: Map[Byte, InnerValLike]): VertexParam = { + this.filters = Some(what) + this + } + +} + +//object RankParam { +// def apply(labelId: Int, keyAndWeights: Seq[(Byte, Double)]) = { +// new RankParam(labelId, keyAndWeights) +// } +//} + +case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = Seq.empty[(Byte, Double)]) { + // empty => Count + lazy val rankKeysWeightsMap = keySeqAndWeights.toMap + + def defaultKey() = { + this.keySeqAndWeights = List((LabelMeta.countSeq, 1.0)) + this + } + + def toHashKeyBytes(): Array[Byte] = { + var bytes = Array.empty[Byte] + keySeqAndWeights.map { case (key, weight) => + bytes = Bytes.add(bytes, Array.fill(1)(key), Bytes.toBytes(weight)) + } + bytes + } +} + +object QueryParam { + lazy val Empty = QueryParam(LabelWithDirection(0, 0)) + lazy val DefaultThreshold = Double.MinValue + val Delimiter = "," +} + +case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System.currentTimeMillis()) { + + import HBaseSerializable._ + import Query.DuplicatePolicy + import Query.DuplicatePolicy._ + + lazy val label = Label.findById(labelWithDir.labelId) + val DefaultKey = LabelIndex.DefaultSeq + val fullKey = DefaultKey + + var labelOrderSeq = fullKey + + var sample = -1 + var limit = 10 + var offset = 0 + var rank = new RankParam(labelWithDir.labelId, List(LabelMeta.countSeq -> 1)) + + var duration: Option[(Long, Long)] = None + var isInverted: Boolean = false + + var columnRangeFilter: ColumnRangeFilter = null + + var hasFilters: Map[Byte, InnerValLike] = Map.empty[Byte, InnerValLike] + var where: Try[Where] = Success(WhereParser.success) + var whereRawOpt: Option[String] = None + var duplicatePolicy = DuplicatePolicy.First + var rpcTimeoutInMillis = 1000 + var maxAttempt = 2 + var includeDegree = false + var tgtVertexInnerIdOpt: Option[InnerValLike] = None + var cacheTTLInMillis: Long = -1L + var threshold = QueryParam.DefaultThreshold + var timeDecay: Option[TimeDecay] = None + var transformer: EdgeTransformer = EdgeTransformer(this, EdgeTransformer.DefaultJson) + var scorePropagateOp: String = "multiply" + var exclude = false + var include = false + var shouldNormalize= false + var cursorOpt: Option[String] = None + + var columnRangeFilterMinBytes = Array.empty[Byte] + var columnRangeFilterMaxBytes = Array.empty[Byte] + + lazy val srcColumnWithDir = label.srcColumnWithDir(labelWithDir.dir) + lazy val tgtColumnWithDir = label.tgtColumnWithDir(labelWithDir.dir) + + def toBytes(idxSeq: Byte, offset: Int, limit: Int, isInverted: Boolean): Array[Byte] = { + val front = Array[Byte](idxSeq, if (isInverted) 1.toByte else 0.toByte) + Bytes.add(front, Bytes.toBytes((offset.toLong << 32 | limit))) + } + + /** + * consider only I/O specific parameters. + * properties that is used on Graph.filterEdges should not be considered. + * @param bytes + * @return + */ + def toCacheKey(bytes: Array[Byte]): Long = { + val hashBytes = toCacheKeyRaw(bytes) + Hashing.murmur3_128().hashBytes(hashBytes).asLong() +// MurmurHash3.bytesHash(hashBytes) + } + + def toCacheKeyRaw(bytes: Array[Byte]): Array[Byte] = { + val transformBytes = transformer.toHashKeyBytes + //TODO: change this to binrary format. + val whereBytes = Bytes.toBytes(whereRawOpt.getOrElse("")) + val durationBytes = duration.map { case (min, max) => + val minTs = min / cacheTTLInMillis + val maxTs = max / cacheTTLInMillis + Bytes.add(Bytes.toBytes(minTs), Bytes.toBytes(maxTs)) + } getOrElse Array.empty[Byte] +// Bytes.toBytes(duration.toString) + val conditionBytes = Bytes.add(transformBytes, whereBytes, durationBytes) + Bytes.add(Bytes.add(bytes, labelWithDir.bytes, toBytes(labelOrderSeq, offset, limit, isInverted)), rank.toHashKeyBytes(), + Bytes.add(columnRangeFilterMinBytes, columnRangeFilterMaxBytes, conditionBytes)) + } + + def isInverted(isInverted: Boolean): QueryParam = { + this.isInverted = isInverted + this + } + + def labelOrderSeq(labelOrderSeq: Byte): QueryParam = { + this.labelOrderSeq = labelOrderSeq + this + } + + def sample(n: Int): QueryParam = { + this.sample = n + this + } + + def limit(offset: Int, limit: Int): QueryParam = { + /** since degree info is located on first always */ + if (offset == 0 && this.columnRangeFilter == null) { + this.limit = limit + 1 + this.offset = offset + } else { + this.limit = limit + this.offset = offset + 1 + } + // this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, this.offset) + this + } + + def interval(fromTo: Option[(Seq[(Byte, InnerValLike)], Seq[(Byte, InnerValLike)])]): QueryParam = { + fromTo match { + case Some((from, to)) => interval(from, to) + case _ => this + } + } + + def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = { + // val len = label.orderTypes.size.toByte + // val len = label.extraIndicesMap(labelOrderSeq).sortKeyTypes.size.toByte + // logger.error(s"indicesMap: ${label.indicesMap(labelOrderSeq)}") + val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte + + val minMetaByte = InnerVal.minMetaByte + // val maxMetaByte = InnerVal.maxMetaByte + val maxMetaByte = -1.toByte + val toVal = Bytes.add(propsToBytes(to), Array.fill(1)(minMetaByte)) + //FIXME + val fromVal = Bytes.add(propsToBytes(from), Array.fill(10)(maxMetaByte)) + toVal(0) = len + fromVal(0) = len + val maxBytes = fromVal + val minBytes = toVal + this.columnRangeFilterMaxBytes = maxBytes + this.columnRangeFilterMinBytes = minBytes + val rangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true) + this.columnRangeFilter = rangeFilter + this + } + + def duration(minMaxTs: Option[(Long, Long)]): QueryParam = { + minMaxTs match { + case Some((minTs, maxTs)) => duration(minTs, maxTs) + case _ => this + } + } + + def duration(minTs: Long, maxTs: Long): QueryParam = { + this.duration = Some((minTs, maxTs)) + this + } + + def rank(r: RankParam): QueryParam = { + this.rank = r + this + } + + def exclude(filterOut: Boolean): QueryParam = { + this.exclude = filterOut + this + } + + def include(filterIn: Boolean): QueryParam = { + this.include = filterIn + this + } + + def has(hasFilters: Map[Byte, InnerValLike]): QueryParam = { + this.hasFilters = hasFilters + this + } + + def where(whereTry: Try[Where]): QueryParam = { + this.where = whereTry + this + } + + def duplicatePolicy(policy: Option[DuplicatePolicy]): QueryParam = { + this.duplicatePolicy = policy.getOrElse(DuplicatePolicy.First) + this + } + + def rpcTimeout(millis: Int): QueryParam = { + this.rpcTimeoutInMillis = millis + this + } + + def maxAttempt(attempt: Int): QueryParam = { + this.maxAttempt = attempt + this + } + + def includeDegree(includeDegree: Boolean): QueryParam = { + this.includeDegree = includeDegree + this + } + + def tgtVertexInnerIdOpt(other: Option[InnerValLike]): QueryParam = { + this.tgtVertexInnerIdOpt = other + this + } + + def cacheTTLInMillis(other: Long): QueryParam = { + this.cacheTTLInMillis = other + this + } + + def timeDecay(other: Option[TimeDecay]): QueryParam = { + this.timeDecay = other + this + } + + def threshold(other: Double): QueryParam = { + this.threshold = other + this + } + + def transformer(other: Option[JsValue]): QueryParam = { + other match { + case Some(js) => this.transformer = EdgeTransformer(this, js) + case None => + } + this + } + + def scorePropagateOp(scorePropagateOp: String): QueryParam = { + this.scorePropagateOp = scorePropagateOp + this + } + + def shouldNormalize(shouldNormalize: Boolean): QueryParam = { + this.shouldNormalize = shouldNormalize + this + } + + def whereRawOpt(sqlOpt: Option[String]): QueryParam = { + this.whereRawOpt = sqlOpt + this + } + + def cursorOpt(cursorOpt: Option[String]): QueryParam = { + this.cursorOpt = cursorOpt + this + } + + def isSnapshotEdge = tgtVertexInnerIdOpt.isDefined + + override def toString = { + List(label.label, labelOrderSeq, offset, limit, rank, + duration, isInverted, exclude, include, hasFilters).mkString("\t") + // duration, isInverted, exclude, include, hasFilters, outputFields).mkString("\t") + } + + // + // def buildGetRequest(srcVertex: Vertex) = { + // val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir) + // val (srcInnerId, tgtInnerId) = tgtVertexInnerIdOpt match { + // case Some(tgtVertexInnerId) => // _to is given. + // /** we use toInvertedEdgeHashLike so dont need to swap src, tgt */ + // val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) + // val tgt = InnerVal.convertVersion(tgtVertexInnerId, tgtColumn.columnType, label.schemaVersion) + // (src, tgt) + // case None => + // val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) + // (src, src) + // } + // + // val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), TargetVertexId(tgtColumn.id.get, tgtInnerId)) + // val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId)) + // val edge = Edge(srcV, tgtV, labelWithDir) + // + // val get = if (tgtVertexInnerIdOpt.isDefined) { + // val snapshotEdge = edge.toInvertedEdgeHashLike + // val kv = snapshotEdge.kvs.head + // new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) + // } else { + // val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == labelOrderSeq) + // assert(indexedEdgeOpt.isDefined) + // val indexedEdge = indexedEdgeOpt.get + // val kv = indexedEdge.kvs.head + // val table = label.hbaseTableName.getBytes + // //kv.table // + // val rowKey = kv.row // indexedEdge.rowKey.bytes + // val cf = edgeCf + // new GetRequest(table, rowKey, cf) + // } + // + // val (minTs, maxTs) = duration.getOrElse((0L, Long.MaxValue)) + // + // get.maxVersions(1) + // get.setFailfast(true) + // get.setMaxResultsPerColumnFamily(limit) + // get.setRowOffsetPerColumnFamily(offset) + // get.setMinTimestamp(minTs) + // get.setMaxTimestamp(maxTs) + // get.setTimeout(rpcTimeoutInMillis) + // if (columnRangeFilter != null) get.setFilter(columnRangeFilter) + // // get.setMaxAttempt(maxAttempt.toByte) + // // get.setRpcTimeout(rpcTimeoutInMillis) + // + // // if (columnRangeFilter != null) get.filter(columnRangeFilter) + // // logger.debug(s"Get: $get, $offset, $limit") + // + // get + // } +} + +case class TimeDecay(initial: Double = 1.0, + lambda: Double = 0.1, + timeUnit: Double = 60 * 60 * 24, + labelMetaSeq: Byte = LabelMeta.timeStampSeq) { + def decay(diff: Double): Double = { + //FIXME + val ret = initial * Math.pow(1.0 - lambda, diff / timeUnit) + // logger.debug(s"$initial, $lambda, $timeUnit, $diff, ${diff / timeUnit}, $ret") + ret + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala new file mode 100644 index 0000000..550c4c9 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -0,0 +1,42 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs} + +import scala.collection.Seq + +object QueryResult { + def fromVertices(query: Query): Seq[QueryRequestWithResult] = { + if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) { + Seq.empty + } else { + val queryParam = query.steps.head.queryParams.head + val label = queryParam.label + val currentTs = System.currentTimeMillis() + val propsWithTs = Map(LabelMeta.timeStampSeq -> + InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)) + for { + vertex <- query.vertices + } yield { + val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs) + val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore) + QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam), + QueryResult(edgeWithScoreLs = Seq(edgeWithScore))) + } + } + } +} +case class QueryRequestWithResult(queryRequest: QueryRequest, queryResult: QueryResult) + +case class QueryRequest(query: Query, + stepIdx: Int, + vertex: Vertex, + queryParam: QueryParam) + + +case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil, + tailCursor: Array[Byte] = Array.empty, + timestamp: Long = System.currentTimeMillis(), + isFailure: Boolean = false) + +case class EdgeWithScore(edge: Edge, score: Double) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala new file mode 100644 index 0000000..f4f40b7 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala @@ -0,0 +1,101 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn} +import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId} +import play.api.libs.json.Json + +case class Vertex(id: VertexId, + ts: Long = System.currentTimeMillis(), + props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike], + op: Byte = 0, + belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement { + + val innerId = id.innerId + + def schemaVer = serviceColumn.schemaVersion + + def serviceColumn = ServiceColumn.findById(id.colId) + + def service = Service.findById(serviceColumn.serviceId) + + lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName) + + def defaultProps = Map(ColumnMeta.lastModifiedAtColumnSeq.toInt -> InnerVal.withLong(ts, schemaVer)) + + // lazy val kvs = Graph.client.vertexSerializer(this).toKeyValues + + /** TODO: make this as configurable */ + override def serviceName = service.serviceName + + override def isAsync = false + + override def queueKey = Seq(ts.toString, serviceName).mkString("|") + + override def queuePartitionKey = id.innerId.toString + + def propsWithName = for { + (seq, v) <- props + meta <- ColumnMeta.findByIdAndSeq(id.colId, seq.toByte) + } yield (meta.name -> v.toString) + + // /** only used by bulk loader */ + // def buildPuts(): List[Put] = { + // // logger.error(s"put: $this => $rowKey") + //// val put = new Put(rowKey.bytes) + //// for ((q, v) <- qualifiersWithValues) { + //// put.addColumn(vertexCf, q, ts, v) + //// } + //// List(put) + // val kv = kvs.head + // val put = new Put(kv.row) + // kvs.map { kv => + // put.addColumn(kv.cf, kv.qualifier, kv.timestamp, kv.value) + // } + // List(put) + // } + + def toEdgeVertex() = Vertex(SourceVertexId(id.colId, innerId), ts, props, op) + + + override def hashCode() = { + val hash = id.hashCode() + // logger.debug(s"Vertex.hashCode: $this -> $hash") + hash + } + + override def equals(obj: Any) = { + obj match { + case otherVertex: Vertex => + val ret = id == otherVertex.id + // logger.debug(s"Vertex.equals: $this, $obj => $ret") + ret + case _ => false + } + } + + def withProps(newProps: Map[Int, InnerValLike]) = Vertex(id, ts, newProps, op) + + def toLogString(): String = { + val (serviceName, columnName) = + if (!id.storeColId) ("", "") + else (serviceColumn.service.serviceName, serviceColumn.columnName) + + if (propsWithName.nonEmpty) + Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, Json.toJson(propsWithName)).mkString("\t") + else + Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName).mkString("\t") + } +} + +object Vertex { + + def toPropKey(labelId: Int): Int = Byte.MaxValue + labelId + + def toLabelId(propKey: Int): Int = propKey - Byte.MaxValue + + def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue + + // val emptyVertex = Vertex(new CompositeId(CompositeId.defaultColId, CompositeId.defaultInnerId, false, true), + // System.currentTimeMillis()) + def fromString(s: String): Option[Vertex] = Graph.toVertex(s) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala new file mode 100644 index 0000000..c92c2a2 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala @@ -0,0 +1,66 @@ +package org.apache.s2graph.core.mysqls + +/** + * Created by shon on 8/5/15. + */ + +import scalikejdbc._ + +object Bucket extends Model[Bucket] { + + val rangeDelimiter = "~" + val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.") + + def apply(rs: WrappedResultSet): Bucket = { + Bucket(rs.intOpt("id"), + rs.int("experiment_id"), + rs.string("modular"), + rs.string("http_verb"), + rs.string("api_path"), + rs.string("request_body"), + rs.int("timeout"), + rs.string("impression_id"), + rs.boolean("is_graph_query"), + rs.boolean("is_empty")) + } + + def finds(experimentId: Int)(implicit session: DBSession = AutoSession): List[Bucket] = { + val cacheKey = "experimentId=" + experimentId + withCaches(cacheKey) { + sql"""select * from buckets where experiment_id = $experimentId""" + .map { rs => Bucket(rs) }.list().apply() + } + } + + def toRange(str: String): Option[(Int, Int)] = { + val range = str.split(rangeDelimiter) + if (range.length == 2) Option(range.head.toInt, range.last.toInt) + else None + } + + def findByImpressionId(impressionId: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Bucket] = { + val cacheKey = "impressionId=" + impressionId + val sql = sql"""select * from buckets where impression_id=$impressionId""" + .map { rs => Bucket(rs)} + if (useCache) { + withCache(cacheKey) { + sql.single().apply() + } + } else { + sql.single().apply() + } + } +} + +case class Bucket(id: Option[Int], + experimentId: Int, + modular: String, + httpVerb: String, apiPath: String, + requestBody: String, timeout: Int, impressionId: String, + isGraphQuery: Boolean = true, + isEmpty: Boolean = false) { + + import Bucket._ + + lazy val rangeOpt = toRange(modular) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala new file mode 100644 index 0000000..a726f40 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala @@ -0,0 +1,110 @@ +package org.apache.s2graph.core.mysqls + +import play.api.libs.json.Json +import scalikejdbc._ + +object ColumnMeta extends Model[ColumnMeta] { + + val timeStampSeq = 0.toByte + val countSeq = -1.toByte + val lastModifiedAtColumnSeq = 0.toByte + val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", lastModifiedAtColumnSeq, "long") + val maxValue = Byte.MaxValue + + def apply(rs: WrappedResultSet): ColumnMeta = { + ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), rs.byte("seq"), rs.string("data_type").toLowerCase()) + } + + def findById(id: Int)(implicit session: DBSession = AutoSession) = { + // val cacheKey = s"id=$id" + val cacheKey = "id=" + id + withCache(cacheKey) { + sql"""select * from column_metas where id = ${id}""".map { rs => ColumnMeta(rs) }.single.apply + }.get + } + + def findAllByColumn(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { + // val cacheKey = s"columnId=$columnId" + val cacheKey = "columnId=" + columnId + if (useCache) { + withCaches(cacheKey)( sql"""select *from column_metas where column_id = ${columnId} order by seq ASC""" + .map { rs => ColumnMeta(rs) }.list.apply()) + } else { + sql"""select * from column_metas where column_id = ${columnId} order by seq ASC""" + .map { rs => ColumnMeta(rs) }.list.apply() + } + } + + def findByName(columnId: Int, name: String)(implicit session: DBSession = AutoSession) = { + // val cacheKey = s"columnId=$columnId:name=$name" + val cacheKey = "columnId=" + columnId + ":name=" + name + withCache(cacheKey)( sql"""select * from column_metas where column_id = ${columnId} and name = ${name}""" + .map { rs => ColumnMeta(rs) }.single.apply()) + } + + def insert(columnId: Int, name: String, dataType: String)(implicit session: DBSession = AutoSession) = { + val ls = findAllByColumn(columnId, false) + val seq = ls.size + 1 + if (seq <= maxValue) { + sql"""insert into column_metas(column_id, name, seq, data_type) + select ${columnId}, ${name}, ${seq}, ${dataType}""" + .updateAndReturnGeneratedKey.apply() + } + } + + def findOrInsert(columnId: Int, name: String, dataType: String)(implicit session: DBSession = AutoSession): ColumnMeta = { + findByName(columnId, name) match { + case Some(c) => c + case None => + insert(columnId, name, dataType) + expireCache(s"columnId=$columnId:name=$name") + findByName(columnId, name).get + } + } + + def findByIdAndSeq(columnId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { + val cacheKey = "columnId=" + columnId + ":seq=" + seq + lazy val columnMetaOpt = sql""" + select * from column_metas where column_id = ${columnId} and seq = ${seq} + """.map { rs => ColumnMeta(rs) }.single.apply() + + if (useCache) withCache(cacheKey)(columnMetaOpt) + else columnMetaOpt + } + + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val columnMeta = findById(id) + val (columnId, name) = (columnMeta.columnId, columnMeta.name) + sql"""delete from column_metas where id = ${id}""".execute.apply() + val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", s"colunmId=$columnId") + cacheKeys.foreach { key => + expireCache(key) + expireCaches(key) + } + } + + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from column_metas""".map { rs => ColumnMeta(rs) }.list().apply() + + putsToCache(ls.map { x => + val cacheKey = s"id=${x.id.get}" + (cacheKey -> x) + }) + putsToCache(ls.map { x => + val cacheKey = s"columnId=${x.columnId}:name=${x.name}" + (cacheKey -> x) + }) + putsToCache(ls.map { x => + val cacheKey = s"columnId=${x.columnId}:seq=${x.seq}" + (cacheKey -> x) + }) + putsToCaches(ls.groupBy(x => x.columnId).map { case (columnId, ls) => + val cacheKey = s"columnId=${columnId}" + (cacheKey -> ls) + }.toList) + } +} + +case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, dataType: String) { + lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala new file mode 100644 index 0000000..3b5fefb --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala @@ -0,0 +1,77 @@ +package org.apache.s2graph.core.mysqls + +import org.apache.s2graph.core.GraphUtil +import scalikejdbc._ + +import scala.util.Random + +object Experiment extends Model[Experiment] { + val impressionKey = "S2-Impression-Id" + + def apply(rs: WrappedResultSet): Experiment = { + Experiment(rs.intOpt("id"), + rs.int("service_id"), + rs.string("name"), + rs.string("description"), + rs.string("experiment_type"), + rs.int("total_modular")) + } + + def finds(serviceId: Int)(implicit session: DBSession = AutoSession): List[Experiment] = { + val cacheKey = "serviceId=" + serviceId + withCaches(cacheKey) { + sql"""select * from experiments where service_id = ${serviceId}""" + .map { rs => Experiment(rs) }.list().apply() + } + } + + def findBy(serviceId: Int, name: String)(implicit session: DBSession = AutoSession): Option[Experiment] = { + val cacheKey = "serviceId=" + serviceId + ":name=" + name + withCache(cacheKey) { + sql"""select * from experiments where service_id = ${serviceId} and name = ${name}""" + .map { rs => Experiment(rs) }.single.apply + } + } + + def findById(id: Int)(implicit session: DBSession = AutoSession): Option[Experiment] = { + val cacheKey = "id=" + id + withCache(cacheKey)( + sql"""select * from experiments where id = ${id}""" + .map { rs => Experiment(rs) }.single.apply + ) + } +} + +case class Experiment(id: Option[Int], + serviceId: Int, + name: String, + description: String, + experimentType: String, + totalModular: Int) { + + def buckets = Bucket.finds(id.get) + + def rangeBuckets = for { + bucket <- buckets + range <- bucket.rangeOpt + } yield range -> bucket + + + def findBucket(uuid: String, impIdOpt: Option[String] = None): Option[Bucket] = { + impIdOpt match { + case Some(impId) => Bucket.findByImpressionId(impId) + case None => + val seed = experimentType match { + case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1 + case _ => Random.nextInt(totalModular) + 1 + } + findBucket(seed) + } + } + + def findBucket(uuidMod: Int): Option[Bucket] = { + rangeBuckets.find { case ((from, to), bucket) => + from <= uuidMod && uuidMod <= to + }.map(_._2) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala new file mode 100644 index 0000000..0958c2c --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala @@ -0,0 +1,372 @@ +package org.apache.s2graph.core.mysqls + +import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{GraphExceptions, GraphUtil, JSONParser} +import play.api.libs.json.Json +import scalikejdbc._ + +object Label extends Model[Label] { + + val maxHBaseTableNames = 2 + + def apply(rs: WrappedResultSet): Label = { + Label(Option(rs.int("id")), rs.string("label"), + rs.int("src_service_id"), rs.string("src_column_name"), rs.string("src_column_type"), + rs.int("tgt_service_id"), rs.string("tgt_column_name"), rs.string("tgt_column_type"), + rs.boolean("is_directed"), rs.string("service_name"), rs.int("service_id"), rs.string("consistency_level"), + rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), rs.string("schema_version"), rs.boolean("is_async"), rs.string("compressionAlgorithm")) + } + + def deleteAll(label: Label)(implicit session: DBSession) = { + val id = label.id + LabelMeta.findAllByLabelId(id.get, false).foreach { x => LabelMeta.delete(x.id.get) } + LabelIndex.findByLabelIdAll(id.get, false).foreach { x => LabelIndex.delete(x.id.get) } + Label.delete(id.get) + } + + def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = { + val cacheKey = "label=" + labelName + lazy val labelOpt = + sql""" + select * + from labels + where label = ${labelName}""".map { rs => Label(rs) }.single.apply() + + if (useCache) withCache(cacheKey)(labelOpt) + else labelOpt + } + + def insert(label: String, + srcServiceId: Int, + srcColumnName: String, + srcColumnType: String, + tgtServiceId: Int, + tgtColumnName: String, + tgtColumnType: String, + isDirected: Boolean, + serviceName: String, + serviceId: Int, + consistencyLevel: String, + hTableName: String, + hTableTTL: Option[Int], + schemaVersion: String, + isAsync: Boolean, + compressionAlgorithm: String)(implicit session: DBSession = AutoSession) = { + sql""" + insert into labels(label, + src_service_id, src_column_name, src_column_type, + tgt_service_id, tgt_column_name, tgt_column_type, + is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl, schema_version, is_async, compressionAlgorithm) + values (${label}, + ${srcServiceId}, ${srcColumnName}, ${srcColumnType}, + ${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType}, + ${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, ${hTableName}, ${hTableTTL}, + ${schemaVersion}, ${isAsync}, ${compressionAlgorithm}) + """ + .updateAndReturnGeneratedKey.apply() + } + + def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): Option[Label] = { + val cacheKey = "id=" + id + withCache(cacheKey)( + sql""" + select * + from labels + where id = ${id}""" + .map { rs => Label(rs) }.single.apply()) + } + + def findById(id: Int)(implicit session: DBSession = AutoSession): Label = { + val cacheKey = "id=" + id + withCache(cacheKey)( + sql""" + select * + from labels + where id = ${id}""" + .map { rs => Label(rs) }.single.apply()).get + } + + def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + val cacheKey = "tgtColumnId=" + columnId + val col = ServiceColumn.findById(columnId) + withCaches(cacheKey)( + sql""" + select * + from labels + where tgt_column_name = ${col.columnName} + and service_id = ${col.serviceId} + """.map { rs => Label(rs) }.list().apply()) + } + + def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + val cacheKey = "srcColumnId=" + columnId + val col = ServiceColumn.findById(columnId) + withCaches(cacheKey)( + sql""" + select * + from labels + where src_column_name = ${col.columnName} + and service_id = ${col.serviceId} + """.map { rs => Label(rs) }.list().apply()) + } + + def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + val cacheKey = "srcServiceId=" + serviceId + withCaches(cacheKey)( + sql"""select * from labels where src_service_id = ${serviceId}""".map { rs => Label(rs) }.list().apply + ) + } + + def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + val cacheKey = "tgtServiceId=" + serviceId + withCaches(cacheKey)( + sql"""select * from labels where tgt_service_id = ${serviceId}""".map { rs => Label(rs) }.list().apply + ) + } + + def insertAll(labelName: String, srcServiceName: String, srcColumnName: String, srcColumnType: String, + tgtServiceName: String, tgtColumnName: String, tgtColumnType: String, + isDirected: Boolean = true, + serviceName: String, + indices: Seq[Index], + metaProps: Seq[Prop], + consistencyLevel: String, + hTableName: Option[String], + hTableTTL: Option[Int], + schemaVersion: String, + isAsync: Boolean, + compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Label = { + + val srcServiceOpt = Service.findByName(srcServiceName, useCache = false) + val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false) + val serviceOpt = Service.findByName(serviceName, useCache = false) + if (srcServiceOpt.isEmpty) throw new RuntimeException(s"source service $srcServiceName is not created.") + if (tgtServiceOpt.isEmpty) throw new RuntimeException(s"target service $tgtServiceName is not created.") + if (serviceOpt.isEmpty) throw new RuntimeException(s"service $serviceName is not created.") + + val newLabel = for { + srcService <- srcServiceOpt + tgtService <- tgtServiceOpt + service <- serviceOpt + } yield { + val srcServiceId = srcService.id.get + val tgtServiceId = tgtService.id.get + val serviceId = service.id.get + + /** insert serviceColumn */ + val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion) + val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion) + + if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}") + if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}") + + /** create label */ + Label.findByName(labelName, useCache = false).getOrElse { + + val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType, + tgtServiceId, tgtColumnName, tgtColumnType, isDirected, serviceName, serviceId, consistencyLevel, + hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync, compressionAlgorithm).toInt + + val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType) => + val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType) + (propName -> labelMeta.seq) + }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name -> labelMeta.seq).toMap + + if (indices.isEmpty) { + // make default index with _PK, _timestamp, 0 + LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none") + } else { + indices.foreach { index => + val metaSeq = index.propNames.map { name => labelMetaMap(name) } + LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none") + } + } + + val cacheKeys = List(s"id=$createdId", s"label=$labelName") + val ret = findByName(labelName, useCache = false).get + putsToCache(cacheKeys.map(k => k -> ret)) + ret + } + } + + newLabel.getOrElse(throw new RuntimeException("failed to create label")) + } + + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from labels""".map { rs => Label(rs) }.list().apply() + putsToCache(ls.map { x => + val cacheKey = s"id=${x.id.get}" + (cacheKey -> x) + }) + putsToCache(ls.map { x => + val cacheKey = s"label=${x.label}" + (cacheKey -> x) + }) + } + + def updateName(oldName: String, newName: String)(implicit session: DBSession = AutoSession) = { + logger.info(s"rename label: $oldName -> $newName") + sql"""update labels set label = ${newName} where label = ${oldName}""".update.apply() + } + + def updateHTableName(labelName: String, newHTableName: String)(implicit session: DBSession = AutoSession) = { + logger.info(s"update HTable of label $labelName to $newHTableName") + val cnt = sql"""update labels set hbase_table_name = $newHTableName where label = $labelName""".update().apply() + val label = Label.findByName(labelName, useCache = false).get + + val cacheKeys = List(s"id=${label.id}", s"label=${label.label}") + cacheKeys.foreach { key => + expireCache(key) + expireCaches(key) + } + cnt + } + + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val label = findById(id) + logger.info(s"delete label: $label") + val cnt = sql"""delete from labels where id = ${label.id.get}""".update().apply() + val cacheKeys = List(s"id=$id", s"label=${label.label}") + cacheKeys.foreach { key => + expireCache(key) + expireCaches(key) + } + cnt + } +} + +case class Label(id: Option[Int], label: String, + srcServiceId: Int, srcColumnName: String, srcColumnType: String, + tgtServiceId: Int, tgtColumnName: String, tgtColumnType: String, + isDirected: Boolean = true, serviceName: String, serviceId: Int, consistencyLevel: String = "strong", + hTableName: String, hTableTTL: Option[Int], + schemaVersion: String, isAsync: Boolean = false, + compressionAlgorithm: String) extends JSONParser { + def metas = LabelMeta.findAllByLabelId(id.get) + + def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap + + // lazy val firstHBaseTableName = hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME) + lazy val srcService = Service.findById(srcServiceId) + lazy val tgtService = Service.findById(tgtServiceId) + lazy val service = Service.findById(serviceId) + /** + * TODO + * change this to apply hbase table from target serviceName + */ + // lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.tableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)) + // lazy val (hbaseZkAddr, hbaseTableName) = (Config.HBASE_ZOOKEEPER_QUORUM, hTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)) + // lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").headOption.getOrElse(GraphConnection.getConfVal("hbase.table.name"))) + lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").head) + + lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found")) + lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found")) + + lazy val direction = if (isDirected) "out" else "undirected" + lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq) + + //TODO: Make sure this is correct + lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true) + lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap + lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap + lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap + lazy val extraIndices = indices.filter(idx => defaultIndex.isDefined && idx.id.get != defaultIndex.get.id.get) + // indices filterNot (_.id.get == defaultIndex.get.id.get) + lazy val extraIndicesMap = extraIndices.map(idx => (idx.seq, idx)) toMap + + lazy val metaProps = LabelMeta.reservedMetas.map { m => + if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) + else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) + else m + } ::: LabelMeta.findAllByLabelId(id.get, useCache = true) + + lazy val metaPropsInner = LabelMeta.reservedMetasInner.map { m => + if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) + else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) + else m + } ::: LabelMeta.findAllByLabelId(id.get, useCache = true) + + lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap + lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap + lazy val metaPropNames = metaProps.map(x => x.name) + lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap + + /** this is used only by edgeToProps */ + lazy val metaPropsDefaultMap = (for { + prop <- metaProps if LabelMeta.isValidSeq(prop.seq) + jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) + } yield prop.name -> jsValue).toMap + + lazy val metaPropsDefaultMapInner = (for { + prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq) + jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) + } yield prop.name -> jsValue).toMap + + def srcColumnWithDir(dir: Int) = { + if (dir == GraphUtil.directions("out")) srcColumn else tgtColumn + } + + def tgtColumnWithDir(dir: Int) = { + if (dir == GraphUtil.directions("out")) tgtColumn else srcColumn + } + + def srcTgtColumn(dir: Int) = + if (isDirected) { + (srcColumnWithDir(dir), tgtColumnWithDir(dir)) + } else { + if (dir == GraphUtil.directions("in")) { + (tgtColumn, srcColumn) + } else { + (srcColumn, tgtColumn) + } + } + + def init() = { + metas + metaSeqsToNames + service + srcColumn + tgtColumn + defaultIndex + indices + metaProps + } + + // def srcColumnInnerVal(jsValue: JsValue) = { + // jsValueToInnerVal(jsValue, srcColumnType, version) + // } + // def tgtColumnInnerVal(jsValue: JsValue) = { + // jsValueToInnerVal(jsValue, tgtColumnType, version) + // } + + override def toString(): String = { + val orderByKeys = LabelMeta.findAllByLabelId(id.get) + super.toString() + orderByKeys.toString() + } + + // def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = { + // if (scoring.isEmpty) LabelIndex.defaultSeq + // else { + // LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq) + // + //// LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq) + // } + // } + lazy val toJson = Json.obj("labelName" -> label, + "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson, + "isDirected" -> isDirected, + "serviceName" -> serviceName, + "consistencyLevel" -> consistencyLevel, + "schemaVersion" -> schemaVersion, + "isAsync" -> isAsync, + "compressionAlgorithm" -> compressionAlgorithm, + "defaultIndex" -> defaultIndex.map(x => x.toJson), + "extraIndex" -> extraIndices.map(exIdx => exIdx.toJson), + "metaProps" -> metaProps.filter { labelMeta => LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson) + ) + + +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala new file mode 100644 index 0000000..31e63c3 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala @@ -0,0 +1,143 @@ +package org.apache.s2graph.core.mysqls + +/** + * Created by shon on 6/3/15. + */ + +import play.api.libs.json.Json +import scalikejdbc._ + +object LabelIndex extends Model[LabelIndex] { + val DefaultName = "_PK" + val DefaultMetaSeqs = Seq(LabelMeta.timeStampSeq) + val DefaultSeq = 1.toByte + val MaxOrderSeq = 7 + + def apply(rs: WrappedResultSet): LabelIndex = { + LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), rs.byte("seq"), + rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match { + case metaSeqsList => metaSeqsList + }, + rs.string("formulars")) + } + + def findById(id: Int)(implicit session: DBSession = AutoSession) = { + val cacheKey = "id=" + id + withCache(cacheKey) { + sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply + }.get + } + + def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { + val cacheKey = "labelId=" + labelId + if (useCache) { + withCaches(cacheKey)( sql""" + select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC + """.map { rs => LabelIndex(rs) }.list.apply) + } else { + sql""" + select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC + """.map { rs => LabelIndex(rs) }.list.apply + } + } + + def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String)(implicit session: DBSession = AutoSession): Long = { + sql""" + insert into label_indices(label_id, name, seq, meta_seqs, formulars) + values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}) + """ + .updateAndReturnGeneratedKey.apply() + } + + def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String)(implicit session: DBSession = AutoSession): LabelIndex = { + findByLabelIdAndSeqs(labelId, metaSeqs) match { + case Some(s) => s + case None => + val orders = findByLabelIdAll(labelId, false) + val seq = (orders.size + 1).toByte + assert(seq <= MaxOrderSeq) + val createdId = insert(labelId, indexName, seq, metaSeqs, formulars) + val cacheKeys = List(s"labelId=$labelId:seq=$seq", + s"labelId=$labelId:seqs=$metaSeqs", s"labelId=$labelId:seq=$seq", s"id=$createdId") + cacheKeys.foreach { key => + expireCache(key) + expireCaches(key) + } + + findByLabelIdAndSeq(labelId, seq).get + } + } + + def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte])(implicit session: DBSession = AutoSession): Option[LabelIndex] = { + val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + withCache(cacheKey) { + sql""" + select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} + """.map { rs => LabelIndex(rs) }.single.apply + } + } + + def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = { + // val cacheKey = s"labelId=$labelId:seq=$seq" + val cacheKey = "labelId=" + labelId + ":seq=" + seq + if (useCache) { + withCache(cacheKey)( sql""" + select * from label_indices where label_id = ${labelId} and seq = ${seq} + """.map { rs => LabelIndex(rs) }.single.apply) + } else { + sql""" + select * from label_indices where label_id = ${labelId} and seq = ${seq} + """.map { rs => LabelIndex(rs) }.single.apply + } + } + + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val labelIndex = findById(id) + val seqs = labelIndex.metaSeqs.mkString(",") + val (labelId, seq) = (labelIndex.labelId, labelIndex.seq) + sql"""delete from label_indices where id = ${id}""".execute.apply() + + val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs") + cacheKeys.foreach { key => + expireCache(key) + expireCaches(key) + } + } + + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply + putsToCache(ls.map { x => + var cacheKey = s"id=${x.id.get}" + (cacheKey -> x) + }) + putsToCache(ls.map { x => + var cacheKey = s"labelId=${x.labelId}:seq=${x.seq}}" + (cacheKey -> x) + }) + putsToCache(ls.map { x => + var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}" + (cacheKey -> x) + }) + putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) => + val cacheKey = s"labelId=${labelId}" + (cacheKey -> ls) + }.toList) + } +} + +/** + * formular + * ex1): w1, w2, w3 + * ex2): 1.5 * w1^2 + 3.4 * (w1 * w2), w2, w1 + */ + +case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String) { + lazy val label = Label.findById(labelId) + lazy val metas = label.metaPropsMap + lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq)) + lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name } + lazy val toJson = Json.obj( + "name" -> name, + "propNames" -> sortKeyTypes.map(x => x.name) + ) +}