http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala b/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala index 217b6eb..33ec7d9 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala @@ -3,9 +3,6 @@ package com.kakao.s2graph.core import com.kakao.s2graph.core.types.InnerValLike import play.api.libs.json.{JsNumber, JsString, JsValue} -/** - * Created by hsleep([email protected]) on 2015. 11. 5.. - */ object OrderingUtil { implicit object JsValueOrdering extends Ordering[JsValue] { @@ -38,9 +35,54 @@ object OrderingUtil { } } } + + def TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]): Ordering[(T, T, T, T)] = { + new Ordering[(T, T, T, T)] { + override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = { + val len = ascendingLs.length + val it = ascendingLs.iterator + if (len >= 1) { + val (x, y) = it.next() match { + case true => tx -> ty + case false => ty -> tx + } + val compare1 = ord.compare(x._1, y._1) + if (compare1 != 0) return compare1 + } + + if (len >= 2) { + val (x, y) = it.next() match { + case true => tx -> ty + case false => ty -> tx + } + val compare2 = ord.compare(x._2, y._2) + if (compare2 != 0) return compare2 + } + + if (len >= 3) { + val (x, y) = it.next() match { + case true => tx -> ty + case false => ty -> tx + } + val compare3 = ord.compare(x._3, y._3) + if (compare3 != 0) return compare3 + } + + if (len >= 4) { + val (x, y) = it.next() match { + case true => tx -> ty + case false => ty -> tx + } + val compare4 = ord.compare(x._4, y._4) + if (compare4 != 0) return compare4 + } + 0 + } + } + } } -class SeqMultiOrdering[T: Ordering](ascendingLs: Seq[Boolean], defaultAscending: Boolean = true)(implicit ord: Ordering[T]) extends Ordering[Seq[T]] { +class SeqMultiOrdering[T](ascendingLs: Seq[Boolean], defaultAscending: Boolean = true)(implicit ord: Ordering[T]) extends Ordering[Seq[T]] { override def compare(x: Seq[T], y: Seq[T]): Int = { val xe = x.iterator val ye = y.iterator @@ -60,45 +102,45 @@ class SeqMultiOrdering[T: Ordering](ascendingLs: Seq[Boolean], defaultAscending: } } -class TupleMultiOrdering[T: Ordering](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]) extends Ordering[(T, T, T, T)] { - override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = { - val len = ascendingLs.length - val it = ascendingLs.iterator - if (len >= 1) { - val (x, y) = it.next() match { - case true => tx -> ty - case false => ty -> tx - } - val compare1 = ord.compare(x._1, y._1) - if (compare1 != 0) return compare1 - } - - if (len >= 2) { - val (x, y) = it.next() match { - case true => tx -> ty - case false => ty -> tx - } - val compare2 = ord.compare(x._2, y._2) - if (compare2 != 0) return compare2 - } - - if (len >= 3) { - val (x, y) = it.next() match { - case true => tx -> ty - case false => ty -> tx - } - val compare3 = ord.compare(x._3, y._3) - if (compare3 != 0) return compare3 - } - - if (len >= 4) { - val (x, y) = it.next() match { - case true => tx -> ty - case false => ty -> tx - } - val compare4 = ord.compare(x._4, y._4) - if (compare4 != 0) return compare4 - } - 0 - } -} +//class TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]) extends Ordering[(T, T, T, T)] { +// override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = { +// val len = ascendingLs.length +// val it = ascendingLs.iterator +// if (len >= 1) { +// val (x, y) = it.next() match { +// case true => tx -> ty +// case false => ty -> tx +// } +// val compare1 = ord.compare(x._1, y._1) +// if (compare1 != 0) return compare1 +// } +// +// if (len >= 2) { +// val (x, y) = it.next() match { +// case true => tx -> ty +// case false => ty -> tx +// } +// val compare2 = ord.compare(x._2, y._2) +// if (compare2 != 0) return compare2 +// } +// +// if (len >= 3) { +// val (x, y) = it.next() match { +// case true => tx -> ty +// case false => ty -> tx +// } +// val compare3 = ord.compare(x._3, y._3) +// if (compare3 != 0) return compare3 +// } +// +// if (len >= 4) { +// val (x, y) = it.next() match { +// case true => tx -> ty +// case false => ty -> tx +// } +// val compare4 = ord.compare(x._4, y._4) +// if (compare4 != 0) return compare4 +// } +// 0 +// } +//}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala new file mode 100644 index 0000000..0a26d26 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala @@ -0,0 +1,439 @@ +package com.kakao.s2graph.core + +import com.kakao.s2graph.core.GraphExceptions.BadQueryException +import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.mysqls._ +import com.kakao.s2graph.core.types.{InnerVal, InnerValLike} +import play.api.libs.json.{Json, _} + +import scala.collection.mutable.ListBuffer + +object PostProcess extends JSONParser { + /** + * Result Entity score field name + */ + val timeoutResults = Json.obj("size" -> 0, "results" -> Json.arr(), "isTimeout" -> true) + val emptyResults = Json.obj("size" -> 0, "results" -> Json.arr(), "isEmpty" -> true) + def badRequestResults(ex: => Exception) = ex match { + case ex: BadQueryException => Json.obj("message" -> ex.msg) + case _ => Json.obj("message" -> ex.getMessage) + } + + val SCORE_FIELD_NAME = "scoreSum" + val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props") + + def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { + val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap + // filterNot {case (edge, score) => edge.props.contains(LabelMeta.degreeSeq)} + val groupedEdgesWithRank = (for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields)) + } yield { + (queryRequest.queryParam, edge, score) + }).groupBy { + case (queryParam, edge, rank) if edge.labelWithDir.dir == GraphUtil.directions("in") => + (queryParam.label.srcColumn, queryParam.label.label, queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree) + case (queryParam, edge, rank) => + (queryParam.label.tgtColumn, queryParam.label.label, queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree) + } + + val ret = for { + ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) <- groupedEdgesWithRank if !isDegreeEdge + edgesWithRanks = edgesAndRanks.groupBy(x => x._2.srcVertex).map(_._2.head) + id <- innerValToJsValue(target, tgtColumn.columnType) + } yield { + Json.obj("name" -> tgtColumn.columnName, "id" -> id, + SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum, + "label" -> labelName, + "aggr" -> Json.obj( + "name" -> srcColumn.columnName, + "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) => + innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType) + }, + "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) => + Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType), + "props" -> propsToJson(edge), + "score" -> rank + ) + } + ) + ) + } + + ret.toList + } + + def sortWithFormatted(jsons: Seq[JsObject], + scoreField: String = "scoreSum", + queryRequestWithResultLs: Seq[QueryRequestWithResult], + decrease: Boolean = true): JsObject = { + val ordering = if (decrease) -1 else 1 + val sortedJsons = jsons.sortBy { jsObject => (jsObject \ scoreField).as[Double] * ordering } + + if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons) + else Json.obj( + "size" -> sortedJsons.size, + "results" -> sortedJsons, + "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId() + ) + } + + private def toHashKey(edge: Edge, queryParam: QueryParam, fields: Seq[String], delimiter: String = ","): Int = { + val ls = for { + field <- fields + } yield { + field match { + case "from" | "_from" => edge.srcVertex.innerId + case "to" | "_to" => edge.tgtVertex.innerId + case "label" => edge.labelWithDir.labelId + case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir)) + case "_timestamp" | "timestamp" => edge.ts + case _ => + queryParam.label.metaPropsInvMap.get(field) match { + case None => throw new RuntimeException(s"unknow column: $field") + case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match { + case None => labelMeta.defaultValue + case Some(propVal) => propVal + } + } + } + } + val ret = ls.hashCode() + ret + } + + def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], isSrcVertex: Boolean = false): Seq[Int] = { + for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + q = queryRequest.query + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + } yield toHashKey(edge, queryRequest.queryParam, q.filterOutFields) + } + + def summarizeWithListExcludeFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { + val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) + sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs, decrease = true) + } + + def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { + val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) + sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs) + } + + def summarizeWithListFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { + val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) + sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs) + } + + def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult]): JsValue = { + toSimpleVertexArrJson(queryRequestWithResultLs, Seq.empty[QueryRequestWithResult]) + } + + private def orderBy(q: Query, + orderByColumns: Seq[(String, Boolean)], + rawEdges: ListBuffer[(Map[String, JsValue], Double, (Any, Any, Any, Any))]) + : ListBuffer[(Map[String, JsValue], Double, (Any, Any, Any, Any))] = { + import com.kakao.s2graph.core.OrderingUtil._ + + if (q.withScore && orderByColumns.nonEmpty) { + val ascendingLs = orderByColumns.map(_._2) + rawEdges.sortBy(_._3)(OrderingUtil.TupleMultiOrdering[Any](ascendingLs)) + } else { + rawEdges + } + } + + private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, edge: Edge, column: String): Any = { + column match { + case "score" => score + case "timestamp" | "_timestamp" => edge.ts + case _ => + keyWithJs.get(column) match { + case None => keyWithJs.get("props").map { js => (js \ column).as[JsValue] }.get + case Some(x) => x + } + } + } + + def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsValue = { + val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap + + val degrees = ListBuffer[JsValue]() + val rawEdges = ListBuffer[(Map[String, JsValue], Double, (Any, Any, Any, Any))]() + + if (queryRequestWithResultLs.isEmpty) { + Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr()) + } else { + val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get + val query = queryRequest.query + val queryParam = queryRequest.queryParam + + val orderByColumns = query.orderByColumns.filter { case (column, _) => + column match { + case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => true + case _ => + queryParam.label.metaPropNames.contains(column) + } + } + + /** build result jsons */ + for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + queryParam = queryRequest.queryParam + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, query.filterOutFields)) + } { + // edge to json + val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir) + val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType) + if (edge.isDegree && fromOpt.isDefined) { + degrees += Json.obj( + "from" -> fromOpt.get, + "label" -> queryRequest.queryParam.label.label, + "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir), + LabelMeta.degree.name -> innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG) + ) + } else { + val keyWithJs = edgeToJson(edge, score, queryRequest.query, queryRequest.queryParam) + val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match { + case 0 => + (None, None, None, None) + case 1 => + val it = orderByColumns.iterator + val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) + (v1, None, None, None) + case 2 => + val it = orderByColumns.iterator + val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) + (v1, v2, None, None) + case 3 => + val it = orderByColumns.iterator + val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1) + (v1, v2, v3, None) + case _ => + val it = orderByColumns.iterator + val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1) + val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1) + (v1, v2, v3, v4) + } + + val currentEdge = (keyWithJs, score, orderByValues) + rawEdges += currentEdge + } + } + + if (query.groupByColumns.isEmpty) { + // ordering + val edges = orderBy(query, orderByColumns, rawEdges).map(_._1) + + Json.obj( + "size" -> edges.size, + "degrees" -> degrees, + "results" -> edges, + "impressionId" -> query.impressionId() + ) + } else { + val grouped = rawEdges.groupBy { case (keyWithJs, _, _) => + val props = keyWithJs.get("props") + + for { + column <- query.groupByColumns + value <- keyWithJs.get(column) match { + case None => props.flatMap { js => (js \ column).asOpt[JsValue] } + case Some(x) => Some(x) + } + } yield column -> value + } + + val groupedEdges = { + for { + (groupByKeyVals, groupedRawEdges) <- grouped + } yield { + val scoreSum = groupedRawEdges.map(x => x._2).sum + // ordering + val edges = orderBy(query, orderByColumns, groupedRawEdges).map(_._1) + Json.obj( + "groupBy" -> Json.toJson(groupByKeyVals.toMap), + "scoreSum" -> scoreSum, + "agg" -> edges + ) + } + } + + val groupedSortedJsons = groupedEdges.toList.sortBy { jsVal => -1 * (jsVal \ "scoreSum").as[Double] } + Json.obj( + "size" -> groupedEdges.size, + "degrees" -> degrees, + "results" -> groupedSortedJsons, + "impressionId" -> query.impressionId() + ) + } + } + } + + def verticesToJson(vertices: Iterable[Vertex]) = { + Json.toJson(vertices.flatMap { v => vertexToJson(v) }) + } + + def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, JsValue] = { + for { + (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq) + labelMeta <- queryParam.label.metaPropsMap.get(seq) + jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType) + } yield labelMeta.name -> jsValue + } + + private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, queryParam: QueryParam): JsValue = { + if (parentEdges.isEmpty) { + JsNull + } else { + val parents = for { + parent <- parentEdges + (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get + parentQueryParam = QueryParam(parentEdge.labelWithDir) + parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if parents != JsNull + } yield { + val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge) + val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, parentQueryParam) + ("parents" -> parents) + Json.toJson(edgeJson) + } + + Json.toJson(parents) + } + } + + /** TODO */ + def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = { + val (srcColumn, tgtColumn) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir) + + val kvMapOpt = for { + from <- innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType) + to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType) + } yield { + val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props" + + val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam) + val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap + + val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, column) => + val jsValue = column match { + case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - (System.currentTimeMillis() - queryParam.timestamp)) + case "from" => from + case "to" => to + case "label" => JsString(queryParam.label.label) + case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir)) + case "_timestamp" | "timestamp" => JsNumber(edge.ts) + case "score" => JsNumber(score) + case "props" if propsMap.nonEmpty => Json.toJson(propsMap) + case _ => JsNull + } + + if (jsValue == JsNull) map else map + (column -> jsValue) + } + kvMap + } + + kvMapOpt.getOrElse(Map.empty) + } + + def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = { + val kvs = edgeToJsonInner(edge, score, q, queryParam) + if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> Json.toJson(edgeParent(edge.parentEdges, q, queryParam))) + else kvs + } + + def vertexToJson(vertex: Vertex): Option[JsObject] = { + val serviceColumn = ServiceColumn.findById(vertex.id.colId) + + for { + id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType) + } yield { + Json.obj("serviceName" -> serviceColumn.service.serviceName, + "columnName" -> serviceColumn.columnName, + "id" -> id, "props" -> propsToJson(vertex), + "timestamp" -> vertex.ts, + // "belongsTo" -> vertex.belongLabelIds) + "belongsTo" -> vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label))) + } + } + + private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, InnerValLike]) = { + for { + (seq, value) <- props + name <- seqsToNames.get(seq) + } yield (name, value) + } + + private def propsToJson(vertex: Vertex) = { + val serviceColumn = vertex.serviceColumn + val props = for { + (propKey, innerVal) <- vertex.props + columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, propKey.toByte, useCache = true) + jsValue <- innerValToJsValue(innerVal, columnMeta.dataType) + } yield { + (columnMeta.name -> jsValue) + } + props.toMap + } + + @deprecated(message = "deprecated", since = "0.2") + def propsToJson(edge: Edge) = { + for { + (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq) + metaProp <- edge.label.metaPropsMap.get(seq) + jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType) + } yield { + (metaProp.name, jsValue) + } + } + + @deprecated(message = "deprecated", since = "0.2") + def summarizeWithListExclude(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = { + val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap + + + val groupedEdgesWithRank = (for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields)) + } yield { + (edge, score) + }).groupBy { case (edge, score) => + (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId) + } + + val jsons = for { + ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank + (edges, ranks) = edgesAndRanks.groupBy(x => x._1.srcVertex).map(_._2.head).unzip + tgtId <- innerValToJsValue(target, tgtColumn.columnType) + } yield { + Json.obj(tgtColumn.columnName -> tgtId, + s"${srcColumn.columnName}s" -> + edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)), "scoreSum" -> ranks.sum) + } + val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ "scoreSum").as[Double] }.reverse + if (queryRequestWithResultLs.isEmpty) { + Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons) + } else { + Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons, + "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId()) + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala index f40afab..d41e9fa 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala @@ -54,6 +54,19 @@ object Model { res } } + + def shutdown() = { + ConnectionPool.closeAll() + } + + def loadCache() = { + Service.findAll() + ServiceColumn.findAll() + Label.findAll() + LabelMeta.findAll() + LabelIndex.findAll() + ColumnMeta.findAll() + } } trait Model[V] extends SQLSyntaxSupport[V] { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala new file mode 100644 index 0000000..98cc68b --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala @@ -0,0 +1,509 @@ +package com.kakao.s2graph.core.rest + +import com.kakao.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException} +import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.mysqls._ +import com.kakao.s2graph.core.parsers.WhereParser +import com.kakao.s2graph.core.types._ +import com.typesafe.config.Config +import play.api.libs.json._ + +import scala.util.{Failure, Success, Try} + +class RequestParser(config: Config) extends JSONParser { + + import Management.JsonModel._ + + val hardLimit = 100000 + val defaultLimit = 100 + val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout") + val DefaultMaxAttempt = config.getInt("hbase.client.retries.number") + val DefaultCluster = config.getString("hbase.zookeeper.quorum") + val DefaultCompressionAlgorithm = config.getString("hbase.table.compression.algorithm") + val DefaultPhase = config.getString("phase") + + private def extractScoring(labelId: Int, value: JsValue) = { + val ret = for { + js <- parse[Option[JsObject]](value, "scoring") + } yield { + for { + (k, v) <- js.fields + labelOrderType <- LabelMeta.findByName(labelId, k) + } yield { + val value = v match { + case n: JsNumber => n.as[Double] + case _ => throw new Exception("scoring weight should be double.") + } + (labelOrderType.seq, value) + } + } + ret + } + + def extractInterval(label: Label, jsValue: JsValue) = { + def extractKv(js: JsValue) = js match { + case JsObject(obj) => obj + case JsArray(arr) => arr.flatMap { + case JsObject(obj) => obj + case _ => throw new RuntimeException(s"cannot support json type $js") + } + case _ => throw new RuntimeException(s"cannot support json type: $js") + } + + val ret = for { + js <- parse[Option[JsObject]](jsValue, "interval") + fromJs <- (js \ "from").asOpt[JsValue] + toJs <- (js \ "to").asOpt[JsValue] + } yield { + val from = Management.toProps(label, extractKv(fromJs)) + val to = Management.toProps(label, extractKv(toJs)) + (from, to) + } + + ret + } + + def extractDuration(label: Label, jsValue: JsValue) = { + for { + js <- parse[Option[JsObject]](jsValue, "duration") + } yield { + val minTs = parse[Option[Long]](js, "from").getOrElse(Long.MaxValue) + val maxTs = parse[Option[Long]](js, "to").getOrElse(Long.MinValue) + + (minTs, maxTs) + } + } + + def extractHas(label: Label, jsValue: JsValue) = { + val ret = for { + js <- parse[Option[JsObject]](jsValue, "has") + } yield { + for { + (k, v) <- js.fields + labelMeta <- LabelMeta.findByName(label.id.get, k) + value <- jsValueToInnerVal(v, labelMeta.dataType, label.schemaVersion) + } yield { + labelMeta.seq -> value + } + } + ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike]) + } + + def extractWhere(labelMap: Map[String, Label], jsValue: JsValue) = { + (jsValue \ "where").asOpt[String] match { + case None => Success(WhereParser.success) + case Some(where) => + WhereParser(labelMap).parse(where) match { + case s@Success(_) => s + case Failure(ex) => throw BadQueryException(ex.getMessage, ex) + } + } + } + + def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[Vertex] = { + val vertices = for { + label <- Label.findByName(labelName).toSeq + serviceColumn = if (direction == "out") label.srcColumn else label.tgtColumn + id <- ids + innerId <- jsValueToInnerVal(id, serviceColumn.columnType, label.schemaVersion) + } yield { + Vertex(SourceVertexId(serviceColumn.id.get, innerId), System.currentTimeMillis()) + } + vertices.toSeq + } + + def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = { + try { + val vertices = + (for { + value <- parse[List[JsValue]](jsValue, "srcVertices") + serviceName = parse[String](value, "serviceName") + column = parse[String](value, "columnName") + } yield { + val service = Service.findByName(serviceName).getOrElse(throw BadQueryException("service not found")) + val col = ServiceColumn.find(service.id.get, column).getOrElse(throw BadQueryException("bad column name")) + val (idOpt, idsOpt) = ((value \ "id").asOpt[JsValue], (value \ "ids").asOpt[List[JsValue]]) + for { + idVal <- idOpt ++ idsOpt.toSeq.flatten + + /** bug, need to use labels schemaVersion */ + innerVal <- jsValueToInnerVal(idVal, col.columnType, col.schemaVersion) + } yield { + Vertex(SourceVertexId(col.id.get, innerVal), System.currentTimeMillis()) + } + }).flatten + + if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty") + + val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name)) + val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v) }.map { q => q.copy(filterOutFields = filterOutFields) } + val steps = parse[Vector[JsValue]](jsValue, "steps") + val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true) + val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty) + val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty) + val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs => + for { + js <- jsLs + (column, orderJs) <- js.fields + } yield { + val ascending = orderJs.as[String].toUpperCase match { + case "ASC" => true + case "DESC" => false + } + column -> ascending + } + }.getOrElse(List("score" -> false, "timestamp" -> false)) + val withScore = (jsValue \ "withScore").asOpt[Boolean].getOrElse(true) + val returnTree = (jsValue \ "returnTree").asOpt[Boolean].getOrElse(false) + + // TODO: throw exception, when label dosn't exist + val labelMap = (for { + js <- jsValue \\ "label" + labelName <- js.asOpt[String] + label <- Label.findByName(labelName) + } yield (labelName, label)).toMap + + val querySteps = + steps.zipWithIndex.map { case (step, stepIdx) => + val labelWeights = step match { + case obj: JsObject => + val converted = for { + (k, v) <- (obj \ "weights").asOpt[JsObject].getOrElse(Json.obj()).fields + l <- Label.findByName(k) + } yield { + l.id.get -> v.toString().toDouble + } + converted.toMap + case _ => Map.empty[Int, Double] + } + val queryParamJsVals = step match { + case arr: JsArray => arr.as[List[JsValue]] + case obj: JsObject => (obj \ "step").as[List[JsValue]] + case _ => List.empty[JsValue] + } + val nextStepScoreThreshold = step match { + case obj: JsObject => (obj \ "nextStepThreshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold) + case _ => QueryParam.DefaultThreshold + } + val nextStepLimit = step match { + case obj: JsObject => (obj \ "nextStepLimit").asOpt[Int].getOrElse(-1) + case _ => -1 + } + val cacheTTL = step match { + case obj: JsObject => (obj \ "cacheTTL").asOpt[Int].getOrElse(-1) + case _ => -1 + } + val queryParams = + for { + labelGroup <- queryParamJsVals + queryParam <- parseQueryParam(labelMap, labelGroup) + } yield { + val (_, columnName) = + if (queryParam.labelWithDir.dir == GraphUtil.directions("out")) { + (queryParam.label.srcService.serviceName, queryParam.label.srcColumnName) + } else { + (queryParam.label.tgtService.serviceName, queryParam.label.tgtColumnName) + } + //FIXME: + if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => v.serviceColumn.columnName == columnName)) { + throw BadQueryException("srcVertices contains incompatiable serviceName or columnName with first step.") + } + + queryParam + } + Step(queryParams.toList, labelWeights = labelWeights, + // scoreThreshold = stepThreshold, + nextStepScoreThreshold = nextStepScoreThreshold, + nextStepLimit = nextStepLimit, + cacheTTL = cacheTTL) + + } + + val ret = Query( + vertices, + querySteps, + removeCycle = removeCycle, + selectColumns = selectColumns, + groupByColumns = groupByColumns, + orderByColumns = orderByColumns, + filterOutQuery = filterOutQuery, + filterOutFields = filterOutFields, + withScore = withScore, + returnTree = returnTree + ) + // logger.debug(ret.toString) + ret + } catch { + case e: BadQueryException => + throw e + case e: ModelNotFoundException => + throw BadQueryException(e.getMessage, e) + case e: Exception => + throw BadQueryException(s"$jsValue, $e", e) + } + } + + private def parseQueryParam(labelMap: Map[String, Label], labelGroup: JsValue): Option[QueryParam] = { + for { + labelName <- parse[Option[String]](labelGroup, "label") + } yield { + val label = Label.findByName(labelName).getOrElse(throw BadQueryException(s"$labelName not found")) + val direction = parse[Option[String]](labelGroup, "direction").map(GraphUtil.toDirection(_)).getOrElse(0) + val limit = { + parse[Option[Int]](labelGroup, "limit") match { + case None => defaultLimit + case Some(l) if l < 0 => Int.MaxValue + case Some(l) if l >= 0 => + val default = hardLimit + Math.min(l, default) + } + } + val offset = parse[Option[Int]](labelGroup, "offset").getOrElse(0) + val interval = extractInterval(label, labelGroup) + val duration = extractDuration(label, labelGroup) + val scoring = extractScoring(label.id.get, labelGroup).getOrElse(List.empty[(Byte, Double)]).toList + val exclude = parse[Option[Boolean]](labelGroup, "exclude").getOrElse(false) + val include = parse[Option[Boolean]](labelGroup, "include").getOrElse(false) + val hasFilter = extractHas(label, labelGroup) + val labelWithDir = LabelWithDirection(label.id.get, direction) + val indexNameOpt = (labelGroup \ "index").asOpt[String] + val indexSeq = indexNameOpt match { + case None => label.indexSeqsMap.get(scoring.map(kv => kv._1)).map(_.seq).getOrElse(LabelIndex.DefaultSeq) + case Some(indexName) => label.indexNameMap.get(indexName).map(_.seq).getOrElse(throw new RuntimeException("cannot find index")) + } + val where = extractWhere(labelMap, labelGroup) + val includeDegree = (labelGroup \ "includeDegree").asOpt[Boolean].getOrElse(true) + val rpcTimeout = (labelGroup \ "rpcTimeout").asOpt[Int].getOrElse(DefaultRpcTimeout) + val maxAttempt = (labelGroup \ "maxAttempt").asOpt[Int].getOrElse(DefaultMaxAttempt) + val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].flatMap { jsVal => + jsValueToInnerVal(jsVal, label.tgtColumnWithDir(direction).columnType, label.schemaVersion) + } + val cacheTTL = (labelGroup \ "cacheTTL").asOpt[Long].getOrElse(-1L) + val timeDecayFactor = (labelGroup \ "timeDecay").asOpt[JsObject].map { jsVal => + val initial = (jsVal \ "initial").asOpt[Double].getOrElse(1.0) + val decayRate = (jsVal \ "decayRate").asOpt[Double].getOrElse(0.1) + if (decayRate >= 1.0 || decayRate <= 0.0) throw new BadQueryException("decay rate should be 0.0 ~ 1.0") + val timeUnit = (jsVal \ "timeUnit").asOpt[Double].getOrElse(60 * 60 * 24.0) + TimeDecay(initial, decayRate, timeUnit) + } + val threshold = (labelGroup \ "threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold) + // TODO: refactor this. dirty + val duplicate = parse[Option[String]](labelGroup, "duplicate").map(s => Query.DuplicatePolicy(s)) + + val outputField = (labelGroup \ "outputField").asOpt[String].map(s => Json.arr(Json.arr(s))) + val transformer = if (outputField.isDefined) outputField else (labelGroup \ "transform").asOpt[JsValue] + val scorePropagateOp = (labelGroup \ "scorePropagateOp").asOpt[String].getOrElse("multiply") + val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1) + + // FIXME: Order of command matter + QueryParam(labelWithDir) + .sample(sample) + .limit(offset, limit) + .rank(RankParam(label.id.get, scoring)) + .exclude(exclude) + .include(include) + .interval(interval) + .duration(duration) + .has(hasFilter) + .labelOrderSeq(indexSeq) + .interval(interval) + .where(where) + .duplicatePolicy(duplicate) + .includeDegree(includeDegree) + .rpcTimeout(rpcTimeout) + .maxAttempt(maxAttempt) + .tgtVertexInnerIdOpt(tgtVertexInnerIdOpt) + .cacheTTLInMillis(cacheTTL) + .timeDecay(timeDecayFactor) + .threshold(threshold) + .transformer(transformer) + .scorePropagateOp(scorePropagateOp) + } + } + + private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = { + (js \ key).validate[R] + .fold( + errors => { + val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].map(x => x \ "msg") + val e = Json.obj("args" -> key, "error" -> msg) + throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString) + }, + r => { + r + }) + } + + def toJsValues(jsValue: JsValue): List[JsValue] = { + jsValue match { + case obj: JsObject => List(obj) + case arr: JsArray => arr.as[List[JsValue]] + case _ => List.empty[JsValue] + } + + } + + def toEdges(jsValue: JsValue, operation: String): List[Edge] = { + toJsValues(jsValue).map(toEdge(_, operation)) + } + + def toEdge(jsValue: JsValue, operation: String) = { + + val srcId = parse[JsValue](jsValue, "from") match { + case s: JsString => s.as[String] + case o@_ => s"${o}" + } + val tgtId = parse[JsValue](jsValue, "to") match { + case s: JsString => s.as[String] + case o@_ => s"${o}" + } + val label = parse[String](jsValue, "label") + val timestamp = parse[Long](jsValue, "timestamp") + val direction = parse[Option[String]](jsValue, "direction").getOrElse("") + val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}") + Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString) + + } + + def toVertices(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None) = { + toJsValues(jsValue).map(toVertex(_, operation, serviceName, columnName)) + } + + def toVertex(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None): Vertex = { + val id = parse[JsValue](jsValue, "id") + val ts = parse[Option[Long]](jsValue, "timestamp").getOrElse(System.currentTimeMillis()) + val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get + val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get + val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()) + Management.toVertex(ts, operation, id.toString, sName, cName, props.toString) + } + + def toPropElements(jsObj: JsValue) = Try { + val propName = (jsObj \ "name").as[String] + val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String]) + val defaultValue = (jsObj \ "defaultValue").as[JsValue] match { + case JsString(s) => s + case _@js => js.toString + } + Prop(propName, defaultValue, dataType) + } + + def toPropsElements(jsValue: JsValue): Seq[Prop] = for { + jsObj <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil) + } yield { + val propName = (jsObj \ "name").as[String] + val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String]) + val defaultValue = (jsObj \ "defaultValue").as[JsValue] match { + case JsString(s) => s + case _@js => js.toString + } + Prop(propName, defaultValue, dataType) + } + + def toIndicesElements(jsValue: JsValue): Seq[Index] = for { + jsObj <- jsValue.as[Seq[JsValue]] + indexName = (jsObj \ "name").as[String] + propNames = (jsObj \ "propNames").as[Seq[String]] + } yield Index(indexName, propNames) + + def toLabelElements(jsValue: JsValue) = Try { + val labelName = parse[String](jsValue, "label") + val srcServiceName = parse[String](jsValue, "srcServiceName") + val tgtServiceName = parse[String](jsValue, "tgtServiceName") + val srcColumnName = parse[String](jsValue, "srcColumnName") + val tgtColumnName = parse[String](jsValue, "tgtColumnName") + val srcColumnType = parse[String](jsValue, "srcColumnType") + val tgtColumnType = parse[String](jsValue, "tgtColumnType") + val serviceName = (jsValue \ "serviceName").asOpt[String].getOrElse(tgtServiceName) + val isDirected = (jsValue \ "isDirected").asOpt[Boolean].getOrElse(true) + + val allProps = toPropsElements(jsValue \ "props") + val indices = toIndicesElements(jsValue \ "indices") + + val consistencyLevel = (jsValue \ "consistencyLevel").asOpt[String].getOrElse("weak") + + // expect new label don`t provide hTableName + val hTableName = (jsValue \ "hTableName").asOpt[String] + val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int] + val schemaVersion = (jsValue \ "schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION) + val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false) + val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm) + + (labelName, srcServiceName, srcColumnName, srcColumnType, + tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName, + indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm) + } + + def toIndexElements(jsValue: JsValue) = Try { + val labelName = parse[String](jsValue, "label") + val indices = toIndicesElements(jsValue \ "indices") + (labelName, indices) + } + + def toServiceElements(jsValue: JsValue) = { + val serviceName = parse[String](jsValue, "serviceName") + val cluster = (jsValue \ "cluster").asOpt[String].getOrElse(DefaultCluster) + val hTableName = (jsValue \ "hTableName").asOpt[String].getOrElse(s"${serviceName}-${DefaultPhase}") + val preSplitSize = (jsValue \ "preSplitSize").asOpt[Int].getOrElse(1) + val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int] + val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm) + (serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) + } + + def toServiceColumnElements(jsValue: JsValue) = Try { + val serviceName = parse[String](jsValue, "serviceName") + val columnName = parse[String](jsValue, "columnName") + val columnType = parse[String](jsValue, "columnType") + val props = toPropsElements(jsValue \ "props") + (serviceName, columnName, columnType, props) + } + + def toCheckEdgeParam(jsValue: JsValue) = { + val params = jsValue.as[List[JsValue]] + var isReverted = false + val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]() + val quads = for { + param <- params + labelName <- (param \ "label").asOpt[String] + direction <- GraphUtil.toDir((param \ "direction").asOpt[String].getOrElse("out")) + label <- Label.findByName(labelName) + srcId <- jsValueToInnerVal((param \ "from").as[JsValue], label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion) + tgtId <- jsValueToInnerVal((param \ "to").as[JsValue], label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion) + } yield { + val labelWithDir = LabelWithDirection(label.id.get, direction) + labelWithDirs += labelWithDir + val (src, tgt, dir) = if (direction == 1) { + isReverted = true + (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), + Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), 0) + } else { + (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), + Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), 0) + } + (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir))) + } + + (quads, isReverted) + } + + def toGraphElements(str: String): Seq[GraphElement] = { + val edgeStrs = str.split("\\n") + + for { + edgeStr <- edgeStrs + str <- GraphUtil.parseString(edgeStr) + element <- Graph.toGraphElement(str) + } yield element + } + + def toDeleteParam(json: JsValue) = { + val labelName = (json \ "label").as[String] + val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil) + val direction = (json \ "direction").asOpt[String].getOrElse("out") + + val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil) + val ts = (json \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()) + val vertices = toVertices(labelName, direction, ids) + (labels, direction, ids, ts, vertices) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala new file mode 100644 index 0000000..bef1dec --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala @@ -0,0 +1,183 @@ +package com.kakao.s2graph.core.rest + +import java.net.URL + +import com.kakao.s2graph.core.GraphExceptions.BadQueryException +import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.mysqls.{Bucket, Experiment, Service} +import com.kakao.s2graph.core.utils.logger +import play.api.libs.json._ + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try + +/** + * Public API only return Future.successful or Future.failed + * Don't throw exception + */ +class RestCaller(graph: Graph)(implicit ec: ExecutionContext) { + val s2Parser = new RequestParser(graph.config) + + /** + * Public APIS + */ + def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String): Future[(JsValue, String)] = { + try { + val bucketOpt = for { + service <- Service.findByAccessToken(accessToken) + experiment <- Experiment.findBy(service.id.get, experimentName) + bucket <- experiment.findBucket(uuid) + } yield bucket + + val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found")) + if (bucket.isGraphQuery) buildRequestInner(contentsBody, bucket, uuid).map(_ -> bucket.impressionId) + else throw new RuntimeException("not supported yet") + } catch { + case e: Exception => Future.failed(e) + } + } + + def uriMatch(uri: String, jsQuery: JsValue): Future[JsValue] = { + try { + uri match { + case "/graphs/getEdges" => getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson) + case "/graphs/getEdges/grouped" => getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted) + case "/graphs/getEdgesExcluded" => getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson) + case "/graphs/getEdgesExcluded/grouped" => getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted) + case "/graphs/checkEdges" => checkEdges(jsQuery) + case "/graphs/getEdgesGrouped" => getEdgesAsync(jsQuery)(PostProcess.summarizeWithList) + case "/graphs/getEdgesGroupedExcluded" => getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude) + case "/graphs/getEdgesGroupedExcludedFormatted" => getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted) + case "/graphs/getVertices" => getVertices(jsQuery) + case _ => throw new RuntimeException("route is not found") + } + } catch { + case e: Exception => Future.failed(e) + } + } + + def checkEdges(jsValue: JsValue): Future[JsValue] = { + try { + val (quads, isReverted) = s2Parser.toCheckEdgeParam(jsValue) + + graph.checkEdges(quads).map { case queryRequestWithResultLs => + val edgeJsons = for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + convertedEdge = if (isReverted) edge.duplicateEdge else edge + edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam) + } yield Json.toJson(edgeJson) + + Json.toJson(edgeJsons) + } + } catch { + case e: Exception => Future.failed(e) + } + } + + /** + * Private APIS + */ + private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = { + val filterOutQueryResultsLs = q.filterOutQuery match { + case Some(filterOutQuery) => graph.getEdges(filterOutQuery) + case None => Future.successful(Seq.empty) + } + + for { + queryResultsLs <- graph.getEdges(q) + filterOutResultsLs <- filterOutQueryResultsLs + } yield { + val json = post(queryResultsLs, filterOutResultsLs) + json + } + } + + private def getEdgesAsync(jsonQuery: JsValue) + (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { + + val fetch = eachQuery(post) _ + jsonQuery match { + case JsArray(arr) => Future.traverse(arr.map(s2Parser.toQuery(_)))(fetch).map(JsArray) + case obj@JsObject(_) => fetch(s2Parser.toQuery(obj)) + case _ => throw BadQueryException("Cannot support") + } + } + + private def getEdgesExcludedAsync(jsonQuery: JsValue) + (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { + val q = s2Parser.toQuery(jsonQuery) + val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) + + val fetchFuture = graph.getEdges(q) + val excludeFuture = graph.getEdges(filterOutQuery) + + for { + queryResultLs <- fetchFuture + exclude <- excludeFuture + } yield { + post(queryResultLs, exclude) + } + } + + private def getVertices(jsValue: JsValue) = { + val jsonQuery = jsValue + val ts = System.currentTimeMillis() + val props = "{}" + + val vertices = jsonQuery.as[List[JsValue]].flatMap { js => + val serviceName = (js \ "serviceName").as[String] + val columnName = (js \ "columnName").as[String] + for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield { + Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props) + } + } + + graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) } + } + + + private def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): JsValue = { + var body = bucket.requestBody.replace("#uuid", uuid) + for { + requestKeyJson <- requestKeyJsonOpt + jsObj <- requestKeyJson.asOpt[JsObject] + (key, value) <- jsObj.fieldSet + } { + val replacement = value match { + case JsString(s) => s + case _ => value.toString + } + body = body.replace(key, replacement) + } + + Try(Json.parse(body)).recover { + case e: Exception => + throw new BadQueryException(s"wrong or missing template parameter: ${e.getMessage.takeWhile(_ != '\n')}") + } get + } + + private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: String): Future[JsValue] = { + if (bucket.isEmpty) Future.successful(PostProcess.emptyResults) + else { + val jsonBody = makeRequestJson(Option(contentsBody), bucket, uuid) + val url = new URL(bucket.apiPath) + val path = url.getPath() + + // dummy log for sampling + val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody" + + logger.info(experimentLog) + + uriMatch(path, jsonBody) + } + } + + def calcSize(js: JsValue): Int = js match { + case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0) + case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum + case _ => 0 + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala index 824bb0d..717a5ac 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala @@ -90,7 +90,7 @@ class AsynchbaseQueryBuilder(storage: AsynchbaseStorage)(implicit ec: ExecutionC } val maxSize = storage.config.getInt("future.cache.max.size") - val futureCacheTTL = storage.config.getInt("future.cache.max.idle.ttl") + val futureCacheTTL = storage.config.getInt("future.cache.expire.after.access") val futureCache = CacheBuilder.newBuilder() .initialCapacity(maxSize) .concurrencyLevel(Runtime.getRuntime.availableProcessors()) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala index 4f2ba9f..d281017 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala @@ -1,7 +1,7 @@ package com.kakao.s2graph.core.utils -import play.api.Logger import play.api.libs.json.JsValue +import org.slf4j.LoggerFactory import scala.language.{higherKinds, implicitConversions} @@ -29,8 +29,8 @@ object logger { } } - private val logger = Logger("application") - private val errorLogger = Logger("error") + private val logger = LoggerFactory.getLogger("application") + private val errorLogger = LoggerFactory.getLogger("error") def info[T: Loggable](msg: => T) = logger.info(implicitly[Loggable[T]].toLogMessage(msg)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala index 6374d43..2aad32f 100644 --- a/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala +++ b/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala @@ -4,8 +4,37 @@ import com.kakao.s2graph.core.mysqls.LabelMeta import com.kakao.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId} import com.kakao.s2graph.core.utils.logger import org.scalatest.FunSuite +import org.scalatest.matchers.Matcher class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels { + initTests() + + test("toLogString") { + val testLabelName = labelNameV2 + val bulkQueries = List( + ("1445240543366", "update", "{\"is_blocked\":true}"), + ("1445240543362", "insert", "{\"is_hidden\":false}"), + ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"), + ("1445240543363", "delete", "{}"), + ("1445240543365", "update", "{\"time\":1, \"weight\":-10}")) + + val (srcId, tgtId, labelName) = ("1", "2", testLabelName) + + val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield { + Management.toEdge(ts.toLong, op, srcId, tgtId, labelName, "out", props).toLogString + }).mkString("\n") + + val expected = Seq( + Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{\"is_blocked\":true}"), + Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{\"is_hidden\":false}"), + Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{\"is_hidden\":false,\"weight\":10}"), + Seq("1445240543363", "delete", "e", "1", "2", testLabelName), + Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{\"time\":1,\"weight\":-10}") + ).map(_.mkString("\t")).mkString("\n") + + assert(bulkEdge === expected) + } + test("buildOperation") { val schemaVersion = "v2" val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/GraphTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/GraphTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/GraphTest.scala deleted file mode 100644 index 9a58bd0..0000000 --- a/s2core/src/test/scala/com/kakao/s2graph/core/GraphTest.scala +++ /dev/null @@ -1,8 +0,0 @@ -package com.kakao.s2graph.core - -/** - * Created by shon on 5/29/15. - */ -class GraphTest { - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala new file mode 100644 index 0000000..1c09778 --- /dev/null +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala @@ -0,0 +1,226 @@ +package com.kakao.s2graph.core.Integrate + +import com.kakao.s2graph.core.mysqls._ +import play.api.libs.json.{JsObject, Json} + +class CrudTest extends IntegrateCommon { + import CrudHelper._ + import TestUtil._ + + test("test CRUD") { + var tcNum = 0 + var tcString = "" + var bulkQueries = List.empty[(Long, String, String)] + var expected = Map.empty[String, String] + + val curTime = System.currentTimeMillis + val t1 = curTime + 0 + val t2 = curTime + 1 + val t3 = curTime + 2 + val t4 = curTime + 3 + val t5 = curTime + 4 + + val tcRunner = new CrudTestRunner() + tcNum = 1 + tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test " + + bulkQueries = List( + (t1, "insert", "{\"time\": 10}"), + (t2, "delete", ""), + (t3, "insert", "{\"time\": 10, \"weight\": 20}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 2 + tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) test " + bulkQueries = List( + (t1, "insert", "{\"time\": 10}"), + (t3, "insert", "{\"time\": 10, \"weight\": 20}"), + (t2, "delete", "")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 3 + tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) test " + bulkQueries = List( + (t3, "insert", "{\"time\": 10, \"weight\": 20}"), + (t2, "delete", ""), + (t1, "insert", "{\"time\": 10}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 4 + tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) test " + bulkQueries = List( + (t3, "insert", "{\"time\": 10, \"weight\": 20}"), + (t1, "insert", "{\"time\": 10}"), + (t2, "delete", "")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 5 + tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) test" + bulkQueries = List( + (t2, "delete", ""), + (t1, "insert", "{\"time\": 10}"), + (t3, "insert", "{\"time\": 10, \"weight\": 20}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 6 + tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) test " + bulkQueries = List( + (t2, "delete", ""), + (t3, "insert", "{\"time\": 10, \"weight\": 20}"), + (t1, "insert", "{\"time\": 10}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 7 + tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) test " + bulkQueries = List( + (t1, "update", "{\"time\": 10}"), + (t2, "delete", ""), + (t3, "update", "{\"time\": 10, \"weight\": 20}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + tcNum = 8 + tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) test " + bulkQueries = List( + (t1, "update", "{\"time\": 10}"), + (t3, "update", "{\"time\": 10, \"weight\": 20}"), + (t2, "delete", "")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + tcNum = 9 + tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) test " + bulkQueries = List( + (t2, "delete", ""), + (t1, "update", "{\"time\": 10}"), + (t3, "update", "{\"time\": 10, \"weight\": 20}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + tcNum = 10 + tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) test" + bulkQueries = List( + (t2, "delete", ""), + (t3, "update", "{\"time\": 10, \"weight\": 20}"), + (t1, "update", "{\"time\": 10}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + tcNum = 11 + tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) test " + bulkQueries = List( + (t3, "update", "{\"time\": 10, \"weight\": 20}"), + (t2, "delete", ""), + (t1, "update", "{\"time\": 10}")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + tcNum = 12 + tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) test " + bulkQueries = List( + (t3, "update", "{\"time\": 10, \"weight\": 20}"), + (t1, "update", "{\"time\": 10}"), + (t2, "delete", "")) + expected = Map("time" -> "10", "weight" -> "20") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + + tcNum = 13 + tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) insert(t3) delete(t2) update(t4) test " + bulkQueries = List( + (t5, "update", "{\"is_blocked\": true}"), + (t1, "insert", "{\"is_hidden\": false}"), + (t3, "insert", "{\"is_hidden\": false, \"weight\": 10}"), + (t2, "delete", ""), + (t4, "update", "{\"time\": 1, \"weight\": -10}")) + expected = Map("time" -> "1", "weight" -> "-10", "is_hidden" -> "false", "is_blocked" -> "true") + + tcRunner.run(tcNum, tcString, bulkQueries, expected) + } + + + object CrudHelper { + + class CrudTestRunner { + var seed = 0 + + def run(tcNum: Int, tcString: String, opWithProps: List[(Long, String, String)], expected: Map[String, String]) = { + for { + labelName <- List(testLabelName, testLabelName2) + i <- 0 until NumOfEachTest + } { + seed += 1 + val srcId = seed.toString + val tgtId = srcId + + val maxTs = opWithProps.map(t => t._1).max + + /** insert edges */ + println(s"---- TC${tcNum}_init ----") + val bulkEdges = (for ((ts, op, props) <- opWithProps) yield { + TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props) + }) + + TestUtil.insertEdgesSync(bulkEdges: _*) + + for { + label <- Label.findByName(labelName) + direction <- List("out", "in") + cacheTTL <- List(-1L) + } { + val (serviceName, columnName, id, otherId) = direction match { + case "out" => (label.srcService.serviceName, label.srcColumn.columnName, srcId, tgtId) + case "in" => (label.tgtService.serviceName, label.tgtColumn.columnName, tgtId, srcId) + } + + val qId = if (labelName == testLabelName) id else "\"" + id + "\"" + val query = queryJson(serviceName, columnName, labelName, qId, direction, cacheTTL) + + val jsResult = TestUtil.getEdgesSync(query) + + val results = jsResult \ "results" + val deegrees = (jsResult \ "degrees").as[List[JsObject]] + val propsLs = (results \\ "props").seq + (deegrees.head \ LabelMeta.degree.name).as[Int] should be(1) + + val from = (results \\ "from").seq.last.toString.replaceAll("\"", "") + val to = (results \\ "to").seq.last.toString.replaceAll("\"", "") + + from should be(id.toString) + to should be(otherId.toString) + (results \\ "_timestamp").seq.last.as[Long] should be(maxTs) + + for ((key, expectedVal) <- expected) { + propsLs.last.as[JsObject].keys.contains(key) should be(true) + (propsLs.last \ key).toString should be(expectedVal) + } + } + } + } + + def queryJson(serviceName: String, columnName: String, labelName: String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse( + s""" { "srcVertices": [ + { "serviceName": "$serviceName", + "columnName": "$columnName", + "id": $id } ], + "steps": [ [ { + "label": "$labelName", + "direction": "$dir", + "offset": 0, + "limit": 10, + "cacheTTL": $cacheTTL }]]}""") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala new file mode 100644 index 0000000..0ced48f --- /dev/null +++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala @@ -0,0 +1,311 @@ +package com.kakao.s2graph.core.Integrate + +import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.mysqls.Label +import com.kakao.s2graph.core.rest.RequestParser +import com.kakao.s2graph.core.utils.logger +import com.typesafe.config._ +import org.scalatest._ +import play.api.libs.json.{JsValue, Json} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} + +trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll { + + import TestUtil._ + + var graph: Graph = _ + var parser: RequestParser = _ + var config: Config = _ + + override def beforeAll = { + config = ConfigFactory.load() + graph = new Graph(config)(ExecutionContext.Implicits.global) + parser = new RequestParser(graph.config) + initTestData() + } + + override def afterAll(): Unit = { + graph.shutdown() + } + + /** + * Make Service, Label, Vertex for integrate test + */ + def initTestData() = { + println("[init start]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + Management.deleteService(testServiceName) + + // 1. createService + val jsValue = Json.parse(createService) + val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = + parser.toServiceElements(jsValue) + + val tryRes = + Management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) + println(s">> Service created : $createService, $tryRes") + + val labelNames = Map(testLabelName -> testLabelNameCreate, + testLabelName2 -> testLabelName2Create, + testLabelNameV1 -> testLabelNameV1Create, + testLabelNameWeak -> testLabelNameWeakCreate) + + for { + (labelName, create) <- labelNames + } { + Management.deleteLabel(labelName) + Label.findByName(labelName, useCache = false) match { + case None => + val json = Json.parse(create) + val tryRes = for { + labelArgs <- parser.toLabelElements(json) + label <- (Management.createLabel _).tupled(labelArgs) + } yield label + + tryRes.get + case Some(label) => + println(s">> Label already exist: $create, $label") + } + } + + val vertexPropsKeys = List("age" -> "int") + + vertexPropsKeys.map { case (key, keyType) => + Management.addVertexProp(testServiceName, testColumnName, key, keyType) + } + + println("[init end]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + } + + + /** + * Test Helpers + */ + object TestUtil { + implicit def ec = scala.concurrent.ExecutionContext.global + + // def checkEdgeQueryJson(params: Seq[(String, String, String, String)]) = { + // val arr = for { + // (label, dir, from, to) <- params + // } yield { + // Json.obj("label" -> label, "direction" -> dir, "from" -> from, "to" -> to) + // } + // + // val s = Json.toJson(arr) + // s + // } + + def deleteAllSync(jsValue: JsValue) = { + val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json => + val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json) + val future = graph.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) + + future + }) + + Await.result(future, HttpRequestWaitingTime) + } + + def getEdgesSync(queryJson: JsValue): JsValue = { + logger.info(Json.prettyPrint(queryJson)) + + val ret = graph.getEdges(parser.toQuery(queryJson)) + val result = Await.result(ret, HttpRequestWaitingTime) + val jsResult = PostProcess.toSimpleVertexArrJson(result) + + jsResult + } + + def insertEdgesSync(bulkEdges: String*) = { + val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) + val jsResult = Await.result(req, HttpRequestWaitingTime) + + jsResult + } + + def insertEdgesAsync(bulkEdges: String*) = { + val req = graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait = true) + req + } + + def toEdge(elems: Any*): String = elems.mkString("\t") + + // common tables + val testServiceName = "s2graph" + val testLabelName = "s2graph_label_test" + val testLabelName2 = "s2graph_label_test_2" + val testLabelNameV1 = "s2graph_label_test_v1" + val testLabelNameWeak = "s2graph_label_test_weak" + val testColumnName = "user_id_test" + val testColumnType = "long" + val testTgtColumnName = "item_id_test" + val testHTableName = "test-htable" + val newHTableName = "new-htable" + val index1 = "idx_1" + val index2 = "idx_2" + + val NumOfEachTest = 30 + val HttpRequestWaitingTime = Duration("60 seconds") + + val createService = s"""{"serviceName" : "$testServiceName"}""" + + val testLabelNameCreate = + s""" + { + "label": "$testLabelName", + "srcServiceName": "$testServiceName", + "srcColumnName": "$testColumnName", + "srcColumnType": "long", + "tgtServiceName": "$testServiceName", + "tgtColumnName": "$testColumnName", + "tgtColumnType": "long", + "indices": [ + {"name": "$index1", "propNames": ["weight", "time", "is_hidden", "is_blocked"]}, + {"name": "$index2", "propNames": ["_timestamp"]} + ], + "props": [ + { + "name": "time", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "weight", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "is_hidden", + "dataType": "boolean", + "defaultValue": false + }, + { + "name": "is_blocked", + "dataType": "boolean", + "defaultValue": false + } + ], + "consistencyLevel": "strong", + "schemaVersion": "v2", + "compressionAlgorithm": "gz", + "hTableName": "$testHTableName" + }""" + + val testLabelName2Create = + s""" + { + "label": "$testLabelName2", + "srcServiceName": "$testServiceName", + "srcColumnName": "$testColumnName", + "srcColumnType": "long", + "tgtServiceName": "$testServiceName", + "tgtColumnName": "$testTgtColumnName", + "tgtColumnType": "string", + "indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], + "props": [ + { + "name": "time", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "weight", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "is_hidden", + "dataType": "boolean", + "defaultValue": false + }, + { + "name": "is_blocked", + "dataType": "boolean", + "defaultValue": false + } + ], + "consistencyLevel": "strong", + "isDirected": false, + "schemaVersion": "v3", + "compressionAlgorithm": "gz" + }""" + + val testLabelNameV1Create = + s""" + { + "label": "$testLabelNameV1", + "srcServiceName": "$testServiceName", + "srcColumnName": "$testColumnName", + "srcColumnType": "long", + "tgtServiceName": "$testServiceName", + "tgtColumnName": "${testTgtColumnName}_v1", + "tgtColumnType": "string", + "indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], + "props": [ + { + "name": "time", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "weight", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "is_hidden", + "dataType": "boolean", + "defaultValue": false + }, + { + "name": "is_blocked", + "dataType": "boolean", + "defaultValue": false + } + ], + "consistencyLevel": "strong", + "isDirected": true, + "schemaVersion": "v1", + "compressionAlgorithm": "gz" + }""" + + val testLabelNameWeakCreate = + s""" + { + "label": "$testLabelNameWeak", + "srcServiceName": "$testServiceName", + "srcColumnName": "$testColumnName", + "srcColumnType": "long", + "tgtServiceName": "$testServiceName", + "tgtColumnName": "$testTgtColumnName", + "tgtColumnType": "string", + "indices": [{"name": "$index1", "propNames": ["time", "weight", "is_hidden", "is_blocked"]}], + "props": [ + { + "name": "time", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "weight", + "dataType": "long", + "defaultValue": 0 + }, + { + "name": "is_hidden", + "dataType": "boolean", + "defaultValue": false + }, + { + "name": "is_blocked", + "dataType": "boolean", + "defaultValue": false + } + ], + "consistencyLevel": "weak", + "isDirected": true, + "compressionAlgorithm": "gz" + }""" + } +}
