http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala deleted file mode 100644 index f301d68..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala +++ /dev/null @@ -1,554 +0,0 @@ -package com.kakao.s2graph.core - -import com.kakao.s2graph.core.GraphExceptions.BadQueryException - -import com.kakao.s2graph.core.mysqls.{ColumnMeta, Label, ServiceColumn, LabelMeta} -import com.kakao.s2graph.core.types.{InnerVal, InnerValLike} -import com.kakao.s2graph.core.utils.logger -import play.api.libs.json.{Json, _} -import scala.collection.mutable.{ArrayBuffer, ListBuffer} - -object PostProcess extends JSONParser { - - - type EDGE_VALUES = Map[String, JsValue] - type ORDER_BY_VALUES = (Any, Any, Any, Any) - type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES) - - /** - * Result Entity score field name - */ - val emptyDegrees = Seq.empty[JsValue] - val timeoutResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isTimeout" -> true) - val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr(), "isEmpty" -> true) - def badRequestResults(ex: => Exception) = ex match { - case ex: BadQueryException => Json.obj("message" -> ex.msg) - case _ => Json.obj("message" -> ex.getMessage) - } - - val SCORE_FIELD_NAME = "scoreSum" - val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props") - - def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - // filterNot {case (edge, score) => edge.props.contains(LabelMeta.degreeSeq)} - val groupedEdgesWithRank = (for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields)) - } yield { - (queryRequest.queryParam, edge, score) - }).groupBy { - case (queryParam, edge, rank) if edge.labelWithDir.dir == GraphUtil.directions("in") => - (queryParam.label.srcColumn, queryParam.label.label, queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree) - case (queryParam, edge, rank) => - (queryParam.label.tgtColumn, queryParam.label.label, queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree) - } - - val ret = for { - ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) <- groupedEdgesWithRank if !isDegreeEdge - edgesWithRanks = edgesAndRanks.groupBy(x => x._2.srcVertex).map(_._2.head) - id <- innerValToJsValue(target, tgtColumn.columnType) - } yield { - Json.obj("name" -> tgtColumn.columnName, "id" -> id, - SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum, - "label" -> labelName, - "aggr" -> Json.obj( - "name" -> srcColumn.columnName, - "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) => - innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType) - }, - "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) => - Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType), - "props" -> propsToJson(edge), - "score" -> rank - ) - } - ) - ) - } - - ret.toList - } - - def sortWithFormatted(jsons: Seq[JsObject], - scoreField: String = "scoreSum", - queryRequestWithResultLs: Seq[QueryRequestWithResult], - decrease: Boolean = true): JsObject = { - val ordering = if (decrease) -1 else 1 - val sortedJsons = jsons.sortBy { jsObject => (jsObject \ scoreField).as[Double] * ordering } - - if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons) - else Json.obj( - "size" -> sortedJsons.size, - "results" -> sortedJsons, - "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId() - ) - } - - private def toHashKey(edge: Edge, queryParam: QueryParam, fields: Seq[String], delimiter: String = ","): Int = { - val ls = for { - field <- fields - } yield { - field match { - case "from" | "_from" => edge.srcVertex.innerId.toIdString() - case "to" | "_to" => edge.tgtVertex.innerId.toIdString() - case "label" => edge.labelWithDir.labelId - case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir)) - case "_timestamp" | "timestamp" => edge.ts - case _ => - queryParam.label.metaPropsInvMap.get(field) match { - case None => throw new RuntimeException(s"unknow column: $field") - case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match { - case None => labelMeta.defaultValue - case Some(propVal) => propVal - } - } - } - } - val ret = ls.hashCode() - ret - } - - def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], isSrcVertex: Boolean = false): Seq[Int] = { - for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - q = queryRequest.query - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - } yield toHashKey(edge, queryRequest.queryParam, q.filterOutFields) - } - - def summarizeWithListExcludeFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) - sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs, decrease = true) - } - - def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) - sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs) - } - - def summarizeWithListFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) - sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs) - } - - def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult]): JsValue = { - toSimpleVertexArrJson(queryRequestWithResultLs, Seq.empty[QueryRequestWithResult]) - } - - private def orderBy(queryOption: QueryOption, - rawEdges: ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]): ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)] = { - import com.kakao.s2graph.core.OrderingUtil._ - - if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) { - val ascendingLs = queryOption.orderByColumns.map(_._2) - rawEdges.sortBy(_._3)(TupleMultiOrdering[Any](ascendingLs)) - } else { - rawEdges - } - } - - private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, edge: Edge, column: String): Any = { - column match { - case "score" => score - case "timestamp" | "_timestamp" => edge.ts - case _ => - keyWithJs.get(column) match { - case None => keyWithJs.get("props").map { js => (js \ column).as[JsValue] }.get - case Some(x) => x - } - } - } - - private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): JsValue = { - def traverse(js: JsValue): JsValue = js match { - case JsNull => mapper(JsNull) - case JsUndefined() => mapper(JsUndefined("")) - case JsNumber(v) => mapper(js) - case JsString(v) => mapper(js) - case JsBoolean(v) => mapper(js) - case JsArray(elements) => JsArray(elements.map { t => traverse(mapper(t)) }) - case JsObject(values) => JsObject(values.map { case (k, v) => k -> traverse(mapper(v)) }) - } - - traverse(jsValue) - } - - /** test query with filterOut is not working since it can not diffrentate filterOut */ - private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): JsValue = { - val cursors = _cursors.flatten.iterator - - buildReplaceJson(jsonQuery) { - case js@JsObject(fields) => - val isStep = fields.find { case (k, _) => k == "label" } // find label group - if (isStep.isEmpty) js - else { - // TODO: Order not ensured - val withCursor = js.fieldSet | Set("cursor" -> JsString(cursors.next)) - JsObject(withCursor.toSeq) - } - case js => js - } - } - - private def buildRawEdges(queryOption: QueryOption, - queryRequestWithResultLs: Seq[QueryRequestWithResult], - excludeIds: Map[Int, Boolean], - scoreWeight: Double = 1.0): (ListBuffer[JsValue], ListBuffer[RAW_EDGE]) = { - val degrees = ListBuffer[JsValue]() - val rawEdges = ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]() - val metaPropNamesMap = scala.collection.mutable.Map[String, Int]() - for { - queryRequestWithResult <- queryRequestWithResultLs - } yield { - val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get - queryRequest.queryParam.label.metaPropNames.foreach { metaPropName => - metaPropNamesMap.put(metaPropName, metaPropNamesMap.getOrElse(metaPropName, 0) + 1) - } - } - val propsExistInAll = metaPropNamesMap.filter(kv => kv._2 == queryRequestWithResultLs.length) - val orderByColumns = queryOption.orderByColumns.filter { case (column, _) => - column match { - case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => true - case _ => - propsExistInAll.contains(column) -// //TODO?? -// false -// queryParam.label.metaPropNames.contains(column) - } - } - - /** build result jsons */ - for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - queryParam = queryRequest.queryParam - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - hashKey = toHashKey(edge, queryRequest.queryParam, queryOption.filterOutFields) - if !excludeIds.contains(hashKey) - } { - - // edge to json - val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir) - val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType) - if (edge.isDegree && fromOpt.isDefined) { - if (queryOption.limitOpt.isEmpty) { - degrees += Json.obj( - "from" -> fromOpt.get, - "label" -> queryRequest.queryParam.label.label, - "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir), - LabelMeta.degree.name -> innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG) - ) - } - } else { - val keyWithJs = edgeToJson(edge, score, queryRequest.query, queryRequest.queryParam) - val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match { - case 0 => - (None, None, None, None) - case 1 => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, None, None, None) - case 2 => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, v2, None, None) - case 3 => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, v2, v3, None) - case _ => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, v2, v3, v4) - } - - val currentEdge = (keyWithJs, score * scoreWeight, orderByValues) - rawEdges += currentEdge - } - } - (degrees, rawEdges) - } - - private def buildResultJsValue(queryOption: QueryOption, - degrees: ListBuffer[JsValue], - rawEdges: ListBuffer[RAW_EDGE]): JsValue = { - if (queryOption.groupByColumns.isEmpty) { - // ordering - val filteredEdges = rawEdges.filter(t => t._2 >= queryOption.scoreThreshold) - - val edges = queryOption.limitOpt match { - case None => orderBy(queryOption, filteredEdges).map(_._1) - case Some(limit) => orderBy(queryOption, filteredEdges).map(_._1).take(limit) - } - val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees - - Json.obj( - "size" -> edges.size, - "degrees" -> resultDegrees, -// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()), - "results" -> edges - ) - } else { - val grouped = rawEdges.groupBy { case (keyWithJs, _, _) => - val props = keyWithJs.get("props") - - for { - column <- queryOption.groupByColumns - value <- keyWithJs.get(column) match { - case None => props.flatMap { js => (js \ column).asOpt[JsValue] } - case Some(x) => Some(x) - } - } yield column -> value - } - - val groupedEdgesWithScoreSum = - for { - (groupByKeyVals, groupedRawEdges) <- grouped - scoreSum = groupedRawEdges.map(x => x._2).sum if scoreSum >= queryOption.scoreThreshold - } yield { - // ordering - val edges = orderBy(queryOption, groupedRawEdges).map(_._1) - - //TODO: refactor this - val js = if (queryOption.returnAgg) - Json.obj( - "groupBy" -> Json.toJson(groupByKeyVals.toMap), - "scoreSum" -> scoreSum, - "agg" -> edges - ) - else - Json.obj( - "groupBy" -> Json.toJson(groupByKeyVals.toMap), - "scoreSum" -> scoreSum, - "agg" -> Json.arr() - ) - (js, scoreSum) - } - - val groupedSortedJsons = queryOption.limitOpt match { - case None => - groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1) - case Some(limit) => - groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => scoreSum * -1 }.map(_._1).take(limit) - } - val resultDegrees = if (queryOption.returnDegree) degrees else emptyDegrees - Json.obj( - "size" -> groupedSortedJsons.size, - "degrees" -> resultDegrees, -// "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()), - "results" -> groupedSortedJsons - ) - } - } - - def toSimpleVertexArrJsonMulti(queryOption: QueryOption, - resultWithExcludeLs: Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])], - excludes: Seq[QueryRequestWithResult]): JsValue = { - val excludeIds = (Seq((Seq.empty, excludes)) ++ resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, excludes)) => - acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap - } - - val (degrees, rawEdges) = (ListBuffer.empty[JsValue], ListBuffer.empty[RAW_EDGE]) - for { - (result, localExclude) <- resultWithExcludeLs - } { - val newResult = result.map { queryRequestWithResult => - val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get - val newQuery = queryRequest.query.copy(queryOption = queryOption) - queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = newQuery)) - } - val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, excludeIds) - degrees ++= _degrees - rawEdges ++= _rawEdges - } - buildResultJsValue(queryOption, degrees, rawEdges) - } - - def toSimpleVertexArrJson(queryOption: QueryOption, - queryRequestWithResultLs: Seq[QueryRequestWithResult], - exclude: Seq[QueryRequestWithResult]): JsValue = { - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds) - buildResultJsValue(queryOption, degrees, rawEdges) - } - - - def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult], - exclude: Seq[QueryRequestWithResult]): JsValue = { - - queryRequestWithResultLs.headOption.map { queryRequestWithResult => - val (queryRequest, _) = QueryRequestWithResult.unapply(queryRequestWithResult).get - val query = queryRequest.query - val queryOption = query.queryOption - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - val (degrees, rawEdges) = buildRawEdges(queryOption, queryRequestWithResultLs, excludeIds) - buildResultJsValue(queryOption, degrees, rawEdges) - } getOrElse emptyResults - } - - def verticesToJson(vertices: Iterable[Vertex]) = { - Json.toJson(vertices.flatMap { v => vertexToJson(v) }) - } - - def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, JsValue] = { - for { - (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq) - labelMeta <- queryParam.label.metaPropsMap.get(seq) - jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType) - } yield labelMeta.name -> jsValue - } - - private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, queryParam: QueryParam): JsValue = { - if (parentEdges.isEmpty) { - JsNull - } else { - val parents = for { - parent <- parentEdges - (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get - parentQueryParam = QueryParam(parentEdge.labelWithDir) - parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if parents != JsNull - } yield { - val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge) - val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, parentQueryParam) + ("parents" -> parents) - Json.toJson(edgeJson) - } - - Json.toJson(parents) - } - } - - /** TODO */ - def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = { - val (srcColumn, tgtColumn) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir) - - val kvMapOpt = for { - from <- innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType) - to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType) - } yield { - val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props" - val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam) - val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap - - val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, column) => - val jsValue = column match { - case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - (System.currentTimeMillis() - queryParam.timestamp)) - case "from" => from - case "to" => to - case "label" => JsString(queryParam.label.label) - case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir)) - case "_timestamp" | "timestamp" => JsNumber(edge.ts) - case "score" => JsNumber(score) - case "props" if propsMap.nonEmpty => Json.toJson(propsMap) - case _ => JsNull - } - - if (jsValue == JsNull) map else map + (column -> jsValue) - } - kvMap - } - - kvMapOpt.getOrElse(Map.empty) - } - - def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = { - val kvs = edgeToJsonInner(edge, score, q, queryParam) - if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> Json.toJson(edgeParent(edge.parentEdges, q, queryParam))) - else kvs - } - - def vertexToJson(vertex: Vertex): Option[JsObject] = { - val serviceColumn = ServiceColumn.findById(vertex.id.colId) - - for { - id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType) - } yield { - Json.obj("serviceName" -> serviceColumn.service.serviceName, - "columnName" -> serviceColumn.columnName, - "id" -> id, "props" -> propsToJson(vertex), - "timestamp" -> vertex.ts, - // "belongsTo" -> vertex.belongLabelIds) - "belongsTo" -> vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label))) - } - } - - private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, InnerValLike]) = { - for { - (seq, value) <- props - name <- seqsToNames.get(seq) - } yield (name, value) - } - - private def propsToJson(vertex: Vertex) = { - val serviceColumn = vertex.serviceColumn - val props = for { - (propKey, innerVal) <- vertex.props - columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, propKey.toByte, useCache = true) - jsValue <- innerValToJsValue(innerVal, columnMeta.dataType) - } yield { - (columnMeta.name -> jsValue) - } - props.toMap - } - - @deprecated(message = "deprecated", since = "0.2") - def propsToJson(edge: Edge) = { - for { - (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq) - metaProp <- edge.label.metaPropsMap.get(seq) - jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType) - } yield { - (metaProp.name, jsValue) - } - } - - @deprecated(message = "deprecated", since = "0.2") - def summarizeWithListExclude(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = { - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - - - val groupedEdgesWithRank = (for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields)) - } yield { - (edge, score) - }).groupBy { case (edge, score) => - (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId) - } - - val jsons = for { - ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank - (edges, ranks) = edgesAndRanks.groupBy(x => x._1.srcVertex).map(_._2.head).unzip - tgtId <- innerValToJsValue(target, tgtColumn.columnType) - } yield { - Json.obj(tgtColumn.columnName -> tgtId, - s"${srcColumn.columnName}s" -> - edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)), "scoreSum" -> ranks.sum) - } - val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ "scoreSum").as[Double] }.reverse - if (queryRequestWithResultLs.isEmpty) { - Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons) - } else { - Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons, - "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId()) - } - - } - - -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala deleted file mode 100644 index 0effa07..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala +++ /dev/null @@ -1,583 +0,0 @@ -package com.kakao.s2graph.core - -import com.google.common.hash.Hashing -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.parsers.{Where, WhereParser} -import com.kakao.s2graph.core.types._ -import org.apache.hadoop.hbase.util.Bytes -import org.hbase.async.ColumnRangeFilter -import play.api.libs.json.{JsNull, JsNumber, JsValue, Json} - -import scala.util.hashing.MurmurHash3 -import scala.util.{Success, Try} - -object Query { - val initialScore = 1.0 - lazy val empty = Query() - - def toQuery(srcVertices: Seq[Vertex], queryParam: QueryParam) = Query(srcVertices, Vector(Step(List(queryParam)))) - - object DuplicatePolicy extends Enumeration { - type DuplicatePolicy = Value - val First, Sum, CountSum, Raw = Value - - def apply(policy: String): Value = { - policy match { - case "sum" => Query.DuplicatePolicy.Sum - case "countSum" => Query.DuplicatePolicy.CountSum - case "raw" => Query.DuplicatePolicy.Raw - case _ => DuplicatePolicy.First - } - } - } -} - -case class MultiQuery(queries: Seq[Query], - weights: Seq[Double], - queryOption: QueryOption, - jsonQuery: JsValue = JsNull) - -case class QueryOption(removeCycle: Boolean = false, - selectColumns: Seq[String] = Seq.empty, - groupByColumns: Seq[String] = Seq.empty, - orderByColumns: Seq[(String, Boolean)] = Seq.empty, - filterOutQuery: Option[Query] = None, - filterOutFields: Seq[String] = Seq(LabelMeta.to.name), - withScore: Boolean = true, - returnTree: Boolean = false, - limitOpt: Option[Int] = None, - returnAgg: Boolean = true, - scoreThreshold: Double = Double.MinValue, - returnDegree: Boolean = true) - - -case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex], - steps: IndexedSeq[Step] = Vector.empty[Step], - queryOption: QueryOption = QueryOption(), - jsonQuery: JsValue = JsNull) { - - val removeCycle = queryOption.removeCycle - val selectColumns = queryOption.selectColumns -// val groupBy = queryOption.groupBy - val groupByColumns = queryOption.groupByColumns - val orderByColumns = queryOption.orderByColumns - val filterOutQuery = queryOption.filterOutQuery - val filterOutFields = queryOption.filterOutFields - val withScore = queryOption.withScore - val returnTree = queryOption.returnTree - val limitOpt = queryOption.limitOpt - val returnAgg = queryOption.returnAgg - val returnDegree = queryOption.returnDegree - - def cacheKeyBytes: Array[Byte] = { - val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString) - val groupBytes = Bytes.toBytes(queryOption.groupByColumns.toString) - val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString) - val filterOutBytes = queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte]) - val returnTreeBytes = Bytes.toBytes(queryOption.returnTree) - - Seq(selectBytes, groupBytes, orderByBytes, filterOutBytes, returnTreeBytes).foldLeft(Array.empty[Byte])(Bytes.add) - } - - lazy val selectColumnsSet = queryOption.selectColumns.map { c => - if (c == "_from") "from" - else if (c == "_to") "to" - else c - }.toSet - - /** return logical query id without considering parameter values */ - def templateId(): JsValue = { - Json.toJson(for { - step <- steps - queryParam <- step.queryParams.sortBy(_.labelWithDir.labelId) - } yield { - Json.obj("label" -> queryParam.label.label, "direction" -> GraphUtil.fromDirection(queryParam.labelWithDir.dir)) - }) - } - - def impressionId(): JsNumber = { - val hash = MurmurHash3.stringHash(templateId().toString()) - JsNumber(hash) - } - - def cursorStrings(): Seq[Seq[String]] = { - //Don`t know how to replace all cursor keys in json - steps.map { step => - step.queryParams.map { queryParam => - queryParam.cursorOpt.getOrElse("") - } - } - } -} - -object EdgeTransformer { - val DefaultTransformField = Json.arr("_to") - val DefaultTransformFieldAsList = Json.arr("_to").as[List[String]] - val DefaultJson = Json.arr(DefaultTransformField) -} - -/** - * TODO: step wise outputFields should be used with nextStepLimit, nextStepThreshold. - * @param jsValue - */ -case class EdgeTransformer(queryParam: QueryParam, jsValue: JsValue) { - val Delimiter = "\\$" - val targets = jsValue.asOpt[List[Vector[String]]].toList - val fieldsLs = for { - target <- targets - fields <- target - } yield fields - val isDefault = fieldsLs.size == 1 && fieldsLs.head.size == 1 && (fieldsLs.head.head == "_to" || fieldsLs.head.head == "to") - - def toHashKeyBytes: Array[Byte] = if (isDefault) Array.empty[Byte] else Bytes.toBytes(jsValue.toString) - - def replace(fmt: String, - values: Seq[InnerValLike], - nextStepOpt: Option[Step]): Seq[InnerValLike] = { - - val tokens = fmt.split(Delimiter) - val _values = values.padTo(tokens.length, InnerVal.withStr("", queryParam.label.schemaVersion)) - val mergedStr = tokens.zip(_values).map { case (prefix, innerVal) => prefix + innerVal.toString }.mkString - // logger.error(s"${tokens.toList}, ${values}, $mergedStr") - // println(s"${tokens.toList}, ${values}, $mergedStr") - nextStepOpt match { - case None => - val columnType = - if (queryParam.labelWithDir.dir == GraphUtil.directions("out")) queryParam.label.tgtColumnType - else queryParam.label.srcColumnType - - if (columnType == InnerVal.STRING) Seq(InnerVal.withStr(mergedStr, queryParam.label.schemaVersion)) - else Nil - case Some(nextStep) => - val nextQueryParamsValid = nextStep.queryParams.filter { qParam => - if (qParam.labelWithDir.dir == GraphUtil.directions("out")) qParam.label.srcColumnType == "string" - else qParam.label.tgtColumnType == "string" - } - for { - nextQueryParam <- nextQueryParamsValid - } yield { - InnerVal.withStr(mergedStr, nextQueryParam.label.schemaVersion) - } - } - } - - def toInnerValOpt(edge: Edge, fieldName: String): Option[InnerValLike] = { - fieldName match { - case LabelMeta.to.name => Option(edge.tgtVertex.innerId) - case LabelMeta.from.name => Option(edge.srcVertex.innerId) - case _ => - for { - labelMeta <- queryParam.label.metaPropsInvMap.get(fieldName) - value <- edge.propsWithTs.get(labelMeta.seq) - } yield value.innerVal - } - } - - def transform(edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = { - if (isDefault) Seq(edge) - else { - val edges = for { - fields <- fieldsLs - innerVal <- { - if (fields.size == 1) { - val fieldName = fields.head - toInnerValOpt(edge, fieldName).toSeq - } else { - val fmt +: fieldNames = fields - replace(fmt, fieldNames.flatMap(fieldName => toInnerValOpt(edge, fieldName)), nextStepOpt) - } - } - } yield edge.updateTgtVertex(innerVal).copy(originalEdgeOpt = Option(edge)) - - - edges - } - } -} - -object Step { - val Delimiter = "|" -} - -case class Step(queryParams: List[QueryParam], - labelWeights: Map[Int, Double] = Map.empty, - // scoreThreshold: Double = 0.0, - nextStepScoreThreshold: Double = 0.0, - nextStepLimit: Int = -1, - cacheTTL: Long = -1) { - - lazy val excludes = queryParams.filter(_.exclude) - lazy val includes = queryParams.filterNot(_.exclude) - lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap - - def toCacheKey(lss: Seq[Long]): Long = Hashing.murmur3_128().hashBytes(toCacheKeyRaw(lss)).asLong() -// MurmurHash3.bytesHash(toCacheKeyRaw(lss)) - - def toCacheKeyRaw(lss: Seq[Long]): Array[Byte] = { - var bytes = Array.empty[Byte] - lss.sorted.foreach { h => bytes = Bytes.add(bytes, Bytes.toBytes(h)) } - bytes - } -} - -case class VertexParam(vertices: Seq[Vertex]) { - var filters: Option[Map[Byte, InnerValLike]] = None - - def has(what: Option[Map[Byte, InnerValLike]]): VertexParam = { - what match { - case None => this - case Some(w) => has(w) - } - } - - def has(what: Map[Byte, InnerValLike]): VertexParam = { - this.filters = Some(what) - this - } - -} - -//object RankParam { -// def apply(labelId: Int, keyAndWeights: Seq[(Byte, Double)]) = { -// new RankParam(labelId, keyAndWeights) -// } -//} - -case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = Seq.empty[(Byte, Double)]) { - // empty => Count - lazy val rankKeysWeightsMap = keySeqAndWeights.toMap - - def defaultKey() = { - this.keySeqAndWeights = List((LabelMeta.countSeq, 1.0)) - this - } - - def toHashKeyBytes(): Array[Byte] = { - var bytes = Array.empty[Byte] - keySeqAndWeights.map { case (key, weight) => - bytes = Bytes.add(bytes, Array.fill(1)(key), Bytes.toBytes(weight)) - } - bytes - } -} - -object QueryParam { - lazy val Empty = QueryParam(LabelWithDirection(0, 0)) - lazy val DefaultThreshold = Double.MinValue - val Delimiter = "," -} - -case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = System.currentTimeMillis()) { - - import HBaseSerializable._ - import Query.DuplicatePolicy - import Query.DuplicatePolicy._ - - lazy val label = Label.findById(labelWithDir.labelId) - val DefaultKey = LabelIndex.DefaultSeq - val fullKey = DefaultKey - - var labelOrderSeq = fullKey - - var sample = -1 - var limit = 10 - var offset = 0 - var rank = new RankParam(labelWithDir.labelId, List(LabelMeta.countSeq -> 1)) - - var duration: Option[(Long, Long)] = None - var isInverted: Boolean = false - - var columnRangeFilter: ColumnRangeFilter = null - - var hasFilters: Map[Byte, InnerValLike] = Map.empty[Byte, InnerValLike] - var where: Try[Where] = Success(WhereParser.success) - var whereRawOpt: Option[String] = None - var duplicatePolicy = DuplicatePolicy.First - var rpcTimeoutInMillis = 1000 - var maxAttempt = 2 - var includeDegree = false - var tgtVertexInnerIdOpt: Option[InnerValLike] = None - var cacheTTLInMillis: Long = -1L - var threshold = QueryParam.DefaultThreshold - var timeDecay: Option[TimeDecay] = None - var transformer: EdgeTransformer = EdgeTransformer(this, EdgeTransformer.DefaultJson) - var scorePropagateOp: String = "multiply" - var exclude = false - var include = false - var shouldNormalize= false - var cursorOpt: Option[String] = None - - var columnRangeFilterMinBytes = Array.empty[Byte] - var columnRangeFilterMaxBytes = Array.empty[Byte] - - lazy val srcColumnWithDir = label.srcColumnWithDir(labelWithDir.dir) - lazy val tgtColumnWithDir = label.tgtColumnWithDir(labelWithDir.dir) - - def toBytes(idxSeq: Byte, offset: Int, limit: Int, isInverted: Boolean): Array[Byte] = { - val front = Array[Byte](idxSeq, if (isInverted) 1.toByte else 0.toByte) - Bytes.add(front, Bytes.toBytes((offset.toLong << 32 | limit))) - } - - /** - * consider only I/O specific parameters. - * properties that is used on Graph.filterEdges should not be considered. - * @param bytes - * @return - */ - def toCacheKey(bytes: Array[Byte]): Long = { - val hashBytes = toCacheKeyRaw(bytes) - Hashing.murmur3_128().hashBytes(hashBytes).asLong() -// MurmurHash3.bytesHash(hashBytes) - } - - def toCacheKeyRaw(bytes: Array[Byte]): Array[Byte] = { - val transformBytes = transformer.toHashKeyBytes - //TODO: change this to binrary format. - val whereBytes = Bytes.toBytes(whereRawOpt.getOrElse("")) - val durationBytes = duration.map { case (min, max) => - val minTs = min / cacheTTLInMillis - val maxTs = max / cacheTTLInMillis - Bytes.add(Bytes.toBytes(minTs), Bytes.toBytes(maxTs)) - } getOrElse Array.empty[Byte] -// Bytes.toBytes(duration.toString) - val conditionBytes = Bytes.add(transformBytes, whereBytes, durationBytes) - Bytes.add(Bytes.add(bytes, labelWithDir.bytes, toBytes(labelOrderSeq, offset, limit, isInverted)), rank.toHashKeyBytes(), - Bytes.add(columnRangeFilterMinBytes, columnRangeFilterMaxBytes, conditionBytes)) - } - - def isInverted(isInverted: Boolean): QueryParam = { - this.isInverted = isInverted - this - } - - def labelOrderSeq(labelOrderSeq: Byte): QueryParam = { - this.labelOrderSeq = labelOrderSeq - this - } - - def sample(n: Int): QueryParam = { - this.sample = n - this - } - - def limit(offset: Int, limit: Int): QueryParam = { - /** since degree info is located on first always */ - if (offset == 0 && this.columnRangeFilter == null) { - this.limit = limit + 1 - this.offset = offset - } else { - this.limit = limit - this.offset = offset + 1 - } - // this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, this.offset) - this - } - - def interval(fromTo: Option[(Seq[(Byte, InnerValLike)], Seq[(Byte, InnerValLike)])]): QueryParam = { - fromTo match { - case Some((from, to)) => interval(from, to) - case _ => this - } - } - - def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, InnerValLike)]): QueryParam = { - // val len = label.orderTypes.size.toByte - // val len = label.extraIndicesMap(labelOrderSeq).sortKeyTypes.size.toByte - // logger.error(s"indicesMap: ${label.indicesMap(labelOrderSeq)}") - val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte - - val minMetaByte = InnerVal.minMetaByte - // val maxMetaByte = InnerVal.maxMetaByte - val maxMetaByte = -1.toByte - val toVal = Bytes.add(propsToBytes(to), Array.fill(1)(minMetaByte)) - //FIXME - val fromVal = Bytes.add(propsToBytes(from), Array.fill(10)(maxMetaByte)) - toVal(0) = len - fromVal(0) = len - val maxBytes = fromVal - val minBytes = toVal - this.columnRangeFilterMaxBytes = maxBytes - this.columnRangeFilterMinBytes = minBytes - val rangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true) - this.columnRangeFilter = rangeFilter - this - } - - def duration(minMaxTs: Option[(Long, Long)]): QueryParam = { - minMaxTs match { - case Some((minTs, maxTs)) => duration(minTs, maxTs) - case _ => this - } - } - - def duration(minTs: Long, maxTs: Long): QueryParam = { - this.duration = Some((minTs, maxTs)) - this - } - - def rank(r: RankParam): QueryParam = { - this.rank = r - this - } - - def exclude(filterOut: Boolean): QueryParam = { - this.exclude = filterOut - this - } - - def include(filterIn: Boolean): QueryParam = { - this.include = filterIn - this - } - - def has(hasFilters: Map[Byte, InnerValLike]): QueryParam = { - this.hasFilters = hasFilters - this - } - - def where(whereTry: Try[Where]): QueryParam = { - this.where = whereTry - this - } - - def duplicatePolicy(policy: Option[DuplicatePolicy]): QueryParam = { - this.duplicatePolicy = policy.getOrElse(DuplicatePolicy.First) - this - } - - def rpcTimeout(millis: Int): QueryParam = { - this.rpcTimeoutInMillis = millis - this - } - - def maxAttempt(attempt: Int): QueryParam = { - this.maxAttempt = attempt - this - } - - def includeDegree(includeDegree: Boolean): QueryParam = { - this.includeDegree = includeDegree - this - } - - def tgtVertexInnerIdOpt(other: Option[InnerValLike]): QueryParam = { - this.tgtVertexInnerIdOpt = other - this - } - - def cacheTTLInMillis(other: Long): QueryParam = { - this.cacheTTLInMillis = other - this - } - - def timeDecay(other: Option[TimeDecay]): QueryParam = { - this.timeDecay = other - this - } - - def threshold(other: Double): QueryParam = { - this.threshold = other - this - } - - def transformer(other: Option[JsValue]): QueryParam = { - other match { - case Some(js) => this.transformer = EdgeTransformer(this, js) - case None => - } - this - } - - def scorePropagateOp(scorePropagateOp: String): QueryParam = { - this.scorePropagateOp = scorePropagateOp - this - } - - def shouldNormalize(shouldNormalize: Boolean): QueryParam = { - this.shouldNormalize = shouldNormalize - this - } - - def whereRawOpt(sqlOpt: Option[String]): QueryParam = { - this.whereRawOpt = sqlOpt - this - } - - def cursorOpt(cursorOpt: Option[String]): QueryParam = { - this.cursorOpt = cursorOpt - this - } - - def isSnapshotEdge = tgtVertexInnerIdOpt.isDefined - - override def toString = { - List(label.label, labelOrderSeq, offset, limit, rank, - duration, isInverted, exclude, include, hasFilters).mkString("\t") - // duration, isInverted, exclude, include, hasFilters, outputFields).mkString("\t") - } - - // - // def buildGetRequest(srcVertex: Vertex) = { - // val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir) - // val (srcInnerId, tgtInnerId) = tgtVertexInnerIdOpt match { - // case Some(tgtVertexInnerId) => // _to is given. - // /** we use toInvertedEdgeHashLike so dont need to swap src, tgt */ - // val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) - // val tgt = InnerVal.convertVersion(tgtVertexInnerId, tgtColumn.columnType, label.schemaVersion) - // (src, tgt) - // case None => - // val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) - // (src, src) - // } - // - // val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), TargetVertexId(tgtColumn.id.get, tgtInnerId)) - // val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId)) - // val edge = Edge(srcV, tgtV, labelWithDir) - // - // val get = if (tgtVertexInnerIdOpt.isDefined) { - // val snapshotEdge = edge.toInvertedEdgeHashLike - // val kv = snapshotEdge.kvs.head - // new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) - // } else { - // val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == labelOrderSeq) - // assert(indexedEdgeOpt.isDefined) - // val indexedEdge = indexedEdgeOpt.get - // val kv = indexedEdge.kvs.head - // val table = label.hbaseTableName.getBytes - // //kv.table // - // val rowKey = kv.row // indexedEdge.rowKey.bytes - // val cf = edgeCf - // new GetRequest(table, rowKey, cf) - // } - // - // val (minTs, maxTs) = duration.getOrElse((0L, Long.MaxValue)) - // - // get.maxVersions(1) - // get.setFailfast(true) - // get.setMaxResultsPerColumnFamily(limit) - // get.setRowOffsetPerColumnFamily(offset) - // get.setMinTimestamp(minTs) - // get.setMaxTimestamp(maxTs) - // get.setTimeout(rpcTimeoutInMillis) - // if (columnRangeFilter != null) get.setFilter(columnRangeFilter) - // // get.setMaxAttempt(maxAttempt.toByte) - // // get.setRpcTimeout(rpcTimeoutInMillis) - // - // // if (columnRangeFilter != null) get.filter(columnRangeFilter) - // // logger.debug(s"Get: $get, $offset, $limit") - // - // get - // } -} - -case class TimeDecay(initial: Double = 1.0, - lambda: Double = 0.1, - timeUnit: Double = 60 * 60 * 24, - labelMetaSeq: Byte = LabelMeta.timeStampSeq) { - def decay(diff: Double): Double = { - //FIXME - val ret = initial * Math.pow(1.0 - lambda, diff / timeUnit) - // logger.debug(s"$initial, $lambda, $timeUnit, $diff, ${diff / timeUnit}, $ret") - ret - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala b/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala deleted file mode 100644 index 02d9736..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala +++ /dev/null @@ -1,42 +0,0 @@ -package com.kakao.s2graph.core - -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.types.{InnerVal, InnerValLikeWithTs} - -import scala.collection.Seq - -object QueryResult { - def fromVertices(query: Query): Seq[QueryRequestWithResult] = { - if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) { - Seq.empty - } else { - val queryParam = query.steps.head.queryParams.head - val label = queryParam.label - val currentTs = System.currentTimeMillis() - val propsWithTs = Map(LabelMeta.timeStampSeq -> - InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs)) - for { - vertex <- query.vertices - } yield { - val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = propsWithTs) - val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore) - QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam), - QueryResult(edgeWithScoreLs = Seq(edgeWithScore))) - } - } - } -} -case class QueryRequestWithResult(queryRequest: QueryRequest, queryResult: QueryResult) - -case class QueryRequest(query: Query, - stepIdx: Int, - vertex: Vertex, - queryParam: QueryParam) - - -case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil, - tailCursor: Array[Byte] = Array.empty, - timestamp: Long = System.currentTimeMillis(), - isFailure: Boolean = false) - -case class EdgeWithScore(edge: Edge, score: Double) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala deleted file mode 100644 index c2b86c2..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala +++ /dev/null @@ -1,107 +0,0 @@ -package com.kakao.s2graph.core - - -import com.kakao.s2graph.core.mysqls._ - -//import com.kakao.s2graph.core.models._ - -import com.kakao.s2graph.core.types._ -import play.api.libs.json.Json - -/** - */ -case class Vertex(id: VertexId, - ts: Long = System.currentTimeMillis(), - props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike], - op: Byte = 0, - belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement { - - val innerId = id.innerId - - def schemaVer = serviceColumn.schemaVersion - - def serviceColumn = ServiceColumn.findById(id.colId) - - def service = Service.findById(serviceColumn.serviceId) - - lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.hTableName) - - def defaultProps = Map(ColumnMeta.lastModifiedAtColumnSeq.toInt -> InnerVal.withLong(ts, schemaVer)) - - // lazy val kvs = Graph.client.vertexSerializer(this).toKeyValues - - /** TODO: make this as configurable */ - override def serviceName = service.serviceName - - override def isAsync = false - - override def queueKey = Seq(ts.toString, serviceName).mkString("|") - - override def queuePartitionKey = id.innerId.toString - - def propsWithName = for { - (seq, v) <- props - meta <- ColumnMeta.findByIdAndSeq(id.colId, seq.toByte) - } yield (meta.name -> v.toString) - - // /** only used by bulk loader */ - // def buildPuts(): List[Put] = { - // // logger.error(s"put: $this => $rowKey") - //// val put = new Put(rowKey.bytes) - //// for ((q, v) <- qualifiersWithValues) { - //// put.addColumn(vertexCf, q, ts, v) - //// } - //// List(put) - // val kv = kvs.head - // val put = new Put(kv.row) - // kvs.map { kv => - // put.addColumn(kv.cf, kv.qualifier, kv.timestamp, kv.value) - // } - // List(put) - // } - - def toEdgeVertex() = Vertex(SourceVertexId(id.colId, innerId), ts, props, op) - - - override def hashCode() = { - val hash = id.hashCode() - // logger.debug(s"Vertex.hashCode: $this -> $hash") - hash - } - - override def equals(obj: Any) = { - obj match { - case otherVertex: Vertex => - val ret = id == otherVertex.id - // logger.debug(s"Vertex.equals: $this, $obj => $ret") - ret - case _ => false - } - } - - def withProps(newProps: Map[Int, InnerValLike]) = Vertex(id, ts, newProps, op) - - def toLogString(): String = { - val (serviceName, columnName) = - if (!id.storeColId) ("", "") - else (serviceColumn.service.serviceName, serviceColumn.columnName) - - if (propsWithName.nonEmpty) - Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, Json.toJson(propsWithName)).mkString("\t") - else - Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName).mkString("\t") - } -} - -object Vertex { - - def toPropKey(labelId: Int): Int = Byte.MaxValue + labelId - - def toLabelId(propKey: Int): Int = propKey - Byte.MaxValue - - def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue - - // val emptyVertex = Vertex(new CompositeId(CompositeId.defaultColId, CompositeId.defaultInnerId, false, true), - // System.currentTimeMillis()) - def fromString(s: String): Option[Vertex] = Graph.toVertex(s) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Bucket.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Bucket.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Bucket.scala deleted file mode 100644 index edc6147..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Bucket.scala +++ /dev/null @@ -1,66 +0,0 @@ -package com.kakao.s2graph.core.mysqls - -/** - * Created by shon on 8/5/15. - */ - -import scalikejdbc._ - -object Bucket extends Model[Bucket] { - - val rangeDelimiter = "~" - val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.") - - def apply(rs: WrappedResultSet): Bucket = { - Bucket(rs.intOpt("id"), - rs.int("experiment_id"), - rs.string("modular"), - rs.string("http_verb"), - rs.string("api_path"), - rs.string("request_body"), - rs.int("timeout"), - rs.string("impression_id"), - rs.boolean("is_graph_query"), - rs.boolean("is_empty")) - } - - def finds(experimentId: Int)(implicit session: DBSession = AutoSession): List[Bucket] = { - val cacheKey = "experimentId=" + experimentId - withCaches(cacheKey) { - sql"""select * from buckets where experiment_id = $experimentId""" - .map { rs => Bucket(rs) }.list().apply() - } - } - - def toRange(str: String): Option[(Int, Int)] = { - val range = str.split(rangeDelimiter) - if (range.length == 2) Option(range.head.toInt, range.last.toInt) - else None - } - - def findByImpressionId(impressionId: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Bucket] = { - val cacheKey = "impressionId=" + impressionId - val sql = sql"""select * from buckets where impression_id=$impressionId""" - .map { rs => Bucket(rs)} - if (useCache) { - withCache(cacheKey) { - sql.single().apply() - } - } else { - sql.single().apply() - } - } -} - -case class Bucket(id: Option[Int], - experimentId: Int, - modular: String, - httpVerb: String, apiPath: String, - requestBody: String, timeout: Int, impressionId: String, - isGraphQuery: Boolean = true, - isEmpty: Boolean = false) { - - import Bucket._ - - lazy val rangeOpt = toRange(modular) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ColumnMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ColumnMeta.scala deleted file mode 100644 index 1b6d55f..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ColumnMeta.scala +++ /dev/null @@ -1,110 +0,0 @@ -package com.kakao.s2graph.core.mysqls - -import play.api.libs.json.Json -import scalikejdbc._ - -object ColumnMeta extends Model[ColumnMeta] { - - val timeStampSeq = 0.toByte - val countSeq = -1.toByte - val lastModifiedAtColumnSeq = 0.toByte - val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", lastModifiedAtColumnSeq, "long") - val maxValue = Byte.MaxValue - - def apply(rs: WrappedResultSet): ColumnMeta = { - ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), rs.byte("seq"), rs.string("data_type").toLowerCase()) - } - - def findById(id: Int)(implicit session: DBSession = AutoSession) = { - // val cacheKey = s"id=$id" - val cacheKey = "id=" + id - withCache(cacheKey) { - sql"""select * from column_metas where id = ${id}""".map { rs => ColumnMeta(rs) }.single.apply - }.get - } - - def findAllByColumn(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { - // val cacheKey = s"columnId=$columnId" - val cacheKey = "columnId=" + columnId - if (useCache) { - withCaches(cacheKey)( sql"""select *from column_metas where column_id = ${columnId} order by seq ASC""" - .map { rs => ColumnMeta(rs) }.list.apply()) - } else { - sql"""select * from column_metas where column_id = ${columnId} order by seq ASC""" - .map { rs => ColumnMeta(rs) }.list.apply() - } - } - - def findByName(columnId: Int, name: String)(implicit session: DBSession = AutoSession) = { - // val cacheKey = s"columnId=$columnId:name=$name" - val cacheKey = "columnId=" + columnId + ":name=" + name - withCache(cacheKey)( sql"""select * from column_metas where column_id = ${columnId} and name = ${name}""" - .map { rs => ColumnMeta(rs) }.single.apply()) - } - - def insert(columnId: Int, name: String, dataType: String)(implicit session: DBSession = AutoSession) = { - val ls = findAllByColumn(columnId, false) - val seq = ls.size + 1 - if (seq <= maxValue) { - sql"""insert into column_metas(column_id, name, seq, data_type) - select ${columnId}, ${name}, ${seq}, ${dataType}""" - .updateAndReturnGeneratedKey.apply() - } - } - - def findOrInsert(columnId: Int, name: String, dataType: String)(implicit session: DBSession = AutoSession): ColumnMeta = { - findByName(columnId, name) match { - case Some(c) => c - case None => - insert(columnId, name, dataType) - expireCache(s"columnId=$columnId:name=$name") - findByName(columnId, name).get - } - } - - def findByIdAndSeq(columnId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { - val cacheKey = "columnId=" + columnId + ":seq=" + seq - lazy val columnMetaOpt = sql""" - select * from column_metas where column_id = ${columnId} and seq = ${seq} - """.map { rs => ColumnMeta(rs) }.single.apply() - - if (useCache) withCache(cacheKey)(columnMetaOpt) - else columnMetaOpt - } - - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val columnMeta = findById(id) - val (columnId, name) = (columnMeta.columnId, columnMeta.name) - sql"""delete from column_metas where id = ${id}""".execute.apply() - val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", s"colunmId=$columnId") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - } - - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from column_metas""".map { rs => ColumnMeta(rs) }.list().apply() - - putsToCache(ls.map { x => - val cacheKey = s"id=${x.id.get}" - (cacheKey -> x) - }) - putsToCache(ls.map { x => - val cacheKey = s"columnId=${x.columnId}:name=${x.name}" - (cacheKey -> x) - }) - putsToCache(ls.map { x => - val cacheKey = s"columnId=${x.columnId}:seq=${x.seq}" - (cacheKey -> x) - }) - putsToCaches(ls.groupBy(x => x.columnId).map { case (columnId, ls) => - val cacheKey = s"columnId=${columnId}" - (cacheKey -> ls) - }.toList) - } -} - -case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, dataType: String) { - lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala deleted file mode 100644 index 46b92ab..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala +++ /dev/null @@ -1,78 +0,0 @@ -package com.kakao.s2graph.core.mysqls - -import com.kakao.s2graph.core.GraphUtil -import com.kakao.s2graph.core.utils.logger -import scalikejdbc._ - -import scala.util.Random - -object Experiment extends Model[Experiment] { - val impressionKey = "S2-Impression-Id" - - def apply(rs: WrappedResultSet): Experiment = { - Experiment(rs.intOpt("id"), - rs.int("service_id"), - rs.string("name"), - rs.string("description"), - rs.string("experiment_type"), - rs.int("total_modular")) - } - - def finds(serviceId: Int)(implicit session: DBSession = AutoSession): List[Experiment] = { - val cacheKey = "serviceId=" + serviceId - withCaches(cacheKey) { - sql"""select * from experiments where service_id = ${serviceId}""" - .map { rs => Experiment(rs) }.list().apply() - } - } - - def findBy(serviceId: Int, name: String)(implicit session: DBSession = AutoSession): Option[Experiment] = { - val cacheKey = "serviceId=" + serviceId + ":name=" + name - withCache(cacheKey) { - sql"""select * from experiments where service_id = ${serviceId} and name = ${name}""" - .map { rs => Experiment(rs) }.single.apply - } - } - - def findById(id: Int)(implicit session: DBSession = AutoSession): Option[Experiment] = { - val cacheKey = "id=" + id - withCache(cacheKey)( - sql"""select * from experiments where id = ${id}""" - .map { rs => Experiment(rs) }.single.apply - ) - } -} - -case class Experiment(id: Option[Int], - serviceId: Int, - name: String, - description: String, - experimentType: String, - totalModular: Int) { - - def buckets = Bucket.finds(id.get) - - def rangeBuckets = for { - bucket <- buckets - range <- bucket.rangeOpt - } yield range -> bucket - - - def findBucket(uuid: String, impIdOpt: Option[String] = None): Option[Bucket] = { - impIdOpt match { - case Some(impId) => Bucket.findByImpressionId(impId) - case None => - val seed = experimentType match { - case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1 - case _ => Random.nextInt(totalModular) + 1 - } - findBucket(seed) - } - } - - def findBucket(uuidMod: Int): Option[Bucket] = { - rangeBuckets.find { case ((from, to), bucket) => - from <= uuidMod && uuidMod <= to - }.map(_._2) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala deleted file mode 100644 index 005a01e..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala +++ /dev/null @@ -1,372 +0,0 @@ -package com.kakao.s2graph.core.mysqls - -import com.kakao.s2graph.core.GraphExceptions.ModelNotFoundException -import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop} -import com.kakao.s2graph.core.utils.logger -import com.kakao.s2graph.core.{GraphUtil, JSONParser, Management} -import play.api.libs.json.Json -import scalikejdbc._ - -object Label extends Model[Label] { - - val maxHBaseTableNames = 2 - - def apply(rs: WrappedResultSet): Label = { - Label(Option(rs.int("id")), rs.string("label"), - rs.int("src_service_id"), rs.string("src_column_name"), rs.string("src_column_type"), - rs.int("tgt_service_id"), rs.string("tgt_column_name"), rs.string("tgt_column_type"), - rs.boolean("is_directed"), rs.string("service_name"), rs.int("service_id"), rs.string("consistency_level"), - rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), rs.string("schema_version"), rs.boolean("is_async"), rs.string("compressionAlgorithm")) - } - - def deleteAll(label: Label)(implicit session: DBSession) = { - val id = label.id - LabelMeta.findAllByLabelId(id.get, false).foreach { x => LabelMeta.delete(x.id.get) } - LabelIndex.findByLabelIdAll(id.get, false).foreach { x => LabelIndex.delete(x.id.get) } - Label.delete(id.get) - } - - def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = { - val cacheKey = "label=" + labelName - lazy val labelOpt = - sql""" - select * - from labels - where label = ${labelName}""".map { rs => Label(rs) }.single.apply() - - if (useCache) withCache(cacheKey)(labelOpt) - else labelOpt - } - - def insert(label: String, - srcServiceId: Int, - srcColumnName: String, - srcColumnType: String, - tgtServiceId: Int, - tgtColumnName: String, - tgtColumnType: String, - isDirected: Boolean, - serviceName: String, - serviceId: Int, - consistencyLevel: String, - hTableName: String, - hTableTTL: Option[Int], - schemaVersion: String, - isAsync: Boolean, - compressionAlgorithm: String)(implicit session: DBSession = AutoSession) = { - sql""" - insert into labels(label, - src_service_id, src_column_name, src_column_type, - tgt_service_id, tgt_column_name, tgt_column_type, - is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl, schema_version, is_async, compressionAlgorithm) - values (${label}, - ${srcServiceId}, ${srcColumnName}, ${srcColumnType}, - ${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType}, - ${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, ${hTableName}, ${hTableTTL}, - ${schemaVersion}, ${isAsync}, ${compressionAlgorithm}) - """ - .updateAndReturnGeneratedKey.apply() - } - - def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): Option[Label] = { - val cacheKey = "id=" + id - withCache(cacheKey)( - sql""" - select * - from labels - where id = ${id}""" - .map { rs => Label(rs) }.single.apply()) - } - - def findById(id: Int)(implicit session: DBSession = AutoSession): Label = { - val cacheKey = "id=" + id - withCache(cacheKey)( - sql""" - select * - from labels - where id = ${id}""" - .map { rs => Label(rs) }.single.apply()).get - } - - def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { - val cacheKey = "tgtColumnId=" + columnId - val col = ServiceColumn.findById(columnId) - withCaches(cacheKey)( - sql""" - select * - from labels - where tgt_column_name = ${col.columnName} - and service_id = ${col.serviceId} - """.map { rs => Label(rs) }.list().apply()) - } - - def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { - val cacheKey = "srcColumnId=" + columnId - val col = ServiceColumn.findById(columnId) - withCaches(cacheKey)( - sql""" - select * - from labels - where src_column_name = ${col.columnName} - and service_id = ${col.serviceId} - """.map { rs => Label(rs) }.list().apply()) - } - - def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { - val cacheKey = "srcServiceId=" + serviceId - withCaches(cacheKey)( - sql"""select * from labels where src_service_id = ${serviceId}""".map { rs => Label(rs) }.list().apply - ) - } - - def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { - val cacheKey = "tgtServiceId=" + serviceId - withCaches(cacheKey)( - sql"""select * from labels where tgt_service_id = ${serviceId}""".map { rs => Label(rs) }.list().apply - ) - } - - def insertAll(labelName: String, srcServiceName: String, srcColumnName: String, srcColumnType: String, - tgtServiceName: String, tgtColumnName: String, tgtColumnType: String, - isDirected: Boolean = true, - serviceName: String, - indices: Seq[Index], - metaProps: Seq[Prop], - consistencyLevel: String, - hTableName: Option[String], - hTableTTL: Option[Int], - schemaVersion: String, - isAsync: Boolean, - compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Label = { - - val srcServiceOpt = Service.findByName(srcServiceName, useCache = false) - val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false) - val serviceOpt = Service.findByName(serviceName, useCache = false) - if (srcServiceOpt.isEmpty) throw new RuntimeException(s"source service $srcServiceName is not created.") - if (tgtServiceOpt.isEmpty) throw new RuntimeException(s"target service $tgtServiceName is not created.") - if (serviceOpt.isEmpty) throw new RuntimeException(s"service $serviceName is not created.") - - val newLabel = for { - srcService <- srcServiceOpt - tgtService <- tgtServiceOpt - service <- serviceOpt - } yield { - val srcServiceId = srcService.id.get - val tgtServiceId = tgtService.id.get - val serviceId = service.id.get - - /** insert serviceColumn */ - val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion) - val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion) - - if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}") - if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}") - - /** create label */ - Label.findByName(labelName, useCache = false).getOrElse { - - val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType, - tgtServiceId, tgtColumnName, tgtColumnType, isDirected, serviceName, serviceId, consistencyLevel, - hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync, compressionAlgorithm).toInt - - val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType) => - val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType) - (propName -> labelMeta.seq) - }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name -> labelMeta.seq).toMap - - if (indices.isEmpty) { - // make default index with _PK, _timestamp, 0 - LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none") - } else { - indices.foreach { index => - val metaSeq = index.propNames.map { name => labelMetaMap(name) } - LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none") - } - } - - val cacheKeys = List(s"id=$createdId", s"label=$labelName") - val ret = findByName(labelName, useCache = false).get - putsToCache(cacheKeys.map(k => k -> ret)) - ret - } - } - - newLabel.getOrElse(throw new RuntimeException("failed to create label")) - } - - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from labels""".map { rs => Label(rs) }.list().apply() - putsToCache(ls.map { x => - val cacheKey = s"id=${x.id.get}" - (cacheKey -> x) - }) - putsToCache(ls.map { x => - val cacheKey = s"label=${x.label}" - (cacheKey -> x) - }) - } - - def updateName(oldName: String, newName: String)(implicit session: DBSession = AutoSession) = { - logger.info(s"rename label: $oldName -> $newName") - sql"""update labels set label = ${newName} where label = ${oldName}""".update.apply() - } - - def updateHTableName(labelName: String, newHTableName: String)(implicit session: DBSession = AutoSession) = { - logger.info(s"update HTable of label $labelName to $newHTableName") - val cnt = sql"""update labels set hbase_table_name = $newHTableName where label = $labelName""".update().apply() - val label = Label.findByName(labelName, useCache = false).get - - val cacheKeys = List(s"id=${label.id}", s"label=${label.label}") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - cnt - } - - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val label = findById(id) - logger.info(s"delete label: $label") - val cnt = sql"""delete from labels where id = ${label.id.get}""".update().apply() - val cacheKeys = List(s"id=$id", s"label=${label.label}") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - cnt - } -} - -case class Label(id: Option[Int], label: String, - srcServiceId: Int, srcColumnName: String, srcColumnType: String, - tgtServiceId: Int, tgtColumnName: String, tgtColumnType: String, - isDirected: Boolean = true, serviceName: String, serviceId: Int, consistencyLevel: String = "strong", - hTableName: String, hTableTTL: Option[Int], - schemaVersion: String, isAsync: Boolean = false, - compressionAlgorithm: String) extends JSONParser { - def metas = LabelMeta.findAllByLabelId(id.get) - - def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap - - // lazy val firstHBaseTableName = hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME) - lazy val srcService = Service.findById(srcServiceId) - lazy val tgtService = Service.findById(tgtServiceId) - lazy val service = Service.findById(serviceId) - /** - * TODO - * change this to apply hbase table from target serviceName - */ - // lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.tableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)) - // lazy val (hbaseZkAddr, hbaseTableName) = (Config.HBASE_ZOOKEEPER_QUORUM, hTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)) - // lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").headOption.getOrElse(GraphConnection.getConfVal("hbase.table.name"))) - lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").head) - - lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found")) - lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found")) - - lazy val direction = if (isDirected) "out" else "undirected" - lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq) - - //TODO: Make sure this is correct - lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true) - lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap - lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap - lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap - lazy val extraIndices = indices.filter(idx => defaultIndex.isDefined && idx.id.get != defaultIndex.get.id.get) - // indices filterNot (_.id.get == defaultIndex.get.id.get) - lazy val extraIndicesMap = extraIndices.map(idx => (idx.seq, idx)) toMap - - lazy val metaProps = LabelMeta.reservedMetas.map { m => - if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) - else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) - else m - } ::: LabelMeta.findAllByLabelId(id.get, useCache = true) - - lazy val metaPropsInner = LabelMeta.reservedMetasInner.map { m => - if (m == LabelMeta.to) m.copy(dataType = tgtColumnType) - else if (m == LabelMeta.from) m.copy(dataType = srcColumnType) - else m - } ::: LabelMeta.findAllByLabelId(id.get, useCache = true) - - lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap - lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap - lazy val metaPropNames = metaProps.map(x => x.name) - lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap - - /** this is used only by edgeToProps */ - lazy val metaPropsDefaultMap = (for { - prop <- metaProps if LabelMeta.isValidSeq(prop.seq) - jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) - } yield prop.name -> jsValue).toMap - - lazy val metaPropsDefaultMapInner = (for { - prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq) - jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) - } yield prop.name -> jsValue).toMap - - def srcColumnWithDir(dir: Int) = { - if (dir == GraphUtil.directions("out")) srcColumn else tgtColumn - } - - def tgtColumnWithDir(dir: Int) = { - if (dir == GraphUtil.directions("out")) tgtColumn else srcColumn - } - - def srcTgtColumn(dir: Int) = - if (isDirected) { - (srcColumnWithDir(dir), tgtColumnWithDir(dir)) - } else { - if (dir == GraphUtil.directions("in")) { - (tgtColumn, srcColumn) - } else { - (srcColumn, tgtColumn) - } - } - - def init() = { - metas - metaSeqsToNames - service - srcColumn - tgtColumn - defaultIndex - indices - metaProps - } - - // def srcColumnInnerVal(jsValue: JsValue) = { - // jsValueToInnerVal(jsValue, srcColumnType, version) - // } - // def tgtColumnInnerVal(jsValue: JsValue) = { - // jsValueToInnerVal(jsValue, tgtColumnType, version) - // } - - override def toString(): String = { - val orderByKeys = LabelMeta.findAllByLabelId(id.get) - super.toString() + orderByKeys.toString() - } - - // def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = { - // if (scoring.isEmpty) LabelIndex.defaultSeq - // else { - // LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq) - // - //// LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq) - // } - // } - lazy val toJson = Json.obj("labelName" -> label, - "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson, - "isDirected" -> isDirected, - "serviceName" -> serviceName, - "consistencyLevel" -> consistencyLevel, - "schemaVersion" -> schemaVersion, - "isAsync" -> isAsync, - "compressionAlgorithm" -> compressionAlgorithm, - "defaultIndex" -> defaultIndex.map(x => x.toJson), - "extraIndex" -> extraIndices.map(exIdx => exIdx.toJson), - "metaProps" -> metaProps.filter { labelMeta => LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson) - ) - - -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelIndex.scala deleted file mode 100644 index 47f4a2a..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelIndex.scala +++ /dev/null @@ -1,143 +0,0 @@ -package com.kakao.s2graph.core.mysqls - -/** - * Created by shon on 6/3/15. - */ - -import play.api.libs.json.Json -import scalikejdbc._ - -object LabelIndex extends Model[LabelIndex] { - val DefaultName = "_PK" - val DefaultMetaSeqs = Seq(LabelMeta.timeStampSeq) - val DefaultSeq = 1.toByte - val MaxOrderSeq = 7 - - def apply(rs: WrappedResultSet): LabelIndex = { - LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), rs.byte("seq"), - rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match { - case metaSeqsList => metaSeqsList - }, - rs.string("formulars")) - } - - def findById(id: Int)(implicit session: DBSession = AutoSession) = { - val cacheKey = "id=" + id - withCache(cacheKey) { - sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply - }.get - } - - def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = { - val cacheKey = "labelId=" + labelId - if (useCache) { - withCaches(cacheKey)( sql""" - select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC - """.map { rs => LabelIndex(rs) }.list.apply) - } else { - sql""" - select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC - """.map { rs => LabelIndex(rs) }.list.apply - } - } - - def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String)(implicit session: DBSession = AutoSession): Long = { - sql""" - insert into label_indices(label_id, name, seq, meta_seqs, formulars) - values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}) - """ - .updateAndReturnGeneratedKey.apply() - } - - def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String)(implicit session: DBSession = AutoSession): LabelIndex = { - findByLabelIdAndSeqs(labelId, metaSeqs) match { - case Some(s) => s - case None => - val orders = findByLabelIdAll(labelId, false) - val seq = (orders.size + 1).toByte - assert(seq <= MaxOrderSeq) - val createdId = insert(labelId, indexName, seq, metaSeqs, formulars) - val cacheKeys = List(s"labelId=$labelId:seq=$seq", - s"labelId=$labelId:seqs=$metaSeqs", s"labelId=$labelId:seq=$seq", s"id=$createdId") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - - findByLabelIdAndSeq(labelId, seq).get - } - } - - def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte])(implicit session: DBSession = AutoSession): Option[LabelIndex] = { - val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") - withCache(cacheKey) { - sql""" - select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} - """.map { rs => LabelIndex(rs) }.single.apply - } - } - - def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = { - // val cacheKey = s"labelId=$labelId:seq=$seq" - val cacheKey = "labelId=" + labelId + ":seq=" + seq - if (useCache) { - withCache(cacheKey)( sql""" - select * from label_indices where label_id = ${labelId} and seq = ${seq} - """.map { rs => LabelIndex(rs) }.single.apply) - } else { - sql""" - select * from label_indices where label_id = ${labelId} and seq = ${seq} - """.map { rs => LabelIndex(rs) }.single.apply - } - } - - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val labelIndex = findById(id) - val seqs = labelIndex.metaSeqs.mkString(",") - val (labelId, seq) = (labelIndex.labelId, labelIndex.seq) - sql"""delete from label_indices where id = ${id}""".execute.apply() - - val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - } - - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply - putsToCache(ls.map { x => - var cacheKey = s"id=${x.id.get}" - (cacheKey -> x) - }) - putsToCache(ls.map { x => - var cacheKey = s"labelId=${x.labelId}:seq=${x.seq}}" - (cacheKey -> x) - }) - putsToCache(ls.map { x => - var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}" - (cacheKey -> x) - }) - putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) => - val cacheKey = s"labelId=${labelId}" - (cacheKey -> ls) - }.toList) - } -} - -/** - * formular - * ex1): w1, w2, w3 - * ex2): 1.5 * w1^2 + 3.4 * (w1 * w2), w2, w1 - */ - -case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String) { - lazy val label = Label.findById(labelId) - lazy val metas = label.metaPropsMap - lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq)) - lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name } - lazy val toJson = Json.obj( - "name" -> name, - "propNames" -> sortKeyTypes.map(x => x.name) - ) -}
