http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/app/controllers/PostProcess.scala ---------------------------------------------------------------------- diff --git a/app/controllers/PostProcess.scala b/app/controllers/PostProcess.scala deleted file mode 100644 index bbc7266..0000000 --- a/app/controllers/PostProcess.scala +++ /dev/null @@ -1,432 +0,0 @@ -package controllers - -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.types.{InnerVal, InnerValLike} -import play.api.libs.json.{Json, _} - -import scala.collection.mutable.ListBuffer - -object PostProcess extends JSONParser { - /** - * Result Entity score field name - */ - val SCORE_FIELD_NAME = "scoreSum" - val timeoutResults = Json.obj("size" -> 0, "results" -> Json.arr(), "isTimeout" -> true) - val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", "_timestamp", "timestamp", "score", "props") - - def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - // filterNot {case (edge, score) => edge.props.contains(LabelMeta.degreeSeq)} - val groupedEdgesWithRank = (for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields)) - } yield { - (queryRequest.queryParam, edge, score) - }).groupBy { - case (queryParam, edge, rank) if edge.labelWithDir.dir == GraphUtil.directions("in") => - (queryParam.label.srcColumn, queryParam.label.label, queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree) - case (queryParam, edge, rank) => - (queryParam.label.tgtColumn, queryParam.label.label, queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree) - } - - val ret = for { - ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) <- groupedEdgesWithRank if !isDegreeEdge - edgesWithRanks = edgesAndRanks.groupBy(x => x._2.srcVertex).map(_._2.head) - id <- innerValToJsValue(target, tgtColumn.columnType) - } yield { - Json.obj("name" -> tgtColumn.columnName, "id" -> id, - SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum, - "label" -> labelName, - "aggr" -> Json.obj( - "name" -> srcColumn.columnName, - "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) => - innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType) - }, - "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) => - Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType), - "props" -> propsToJson(edge), - "score" -> rank - ) - } - ) - ) - } - - ret.toList - } - - def sortWithFormatted(jsons: Seq[JsObject], - scoreField: String = "scoreSum", - queryRequestWithResultLs: Seq[QueryRequestWithResult], - decrease: Boolean = true): JsObject = { - val ordering = if (decrease) -1 else 1 - val sortedJsons = jsons.sortBy { jsObject => (jsObject \ scoreField).as[Double] * ordering } - - if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons) - else Json.obj( - "size" -> sortedJsons.size, - "results" -> sortedJsons, - "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId() - ) - } - - private def toHashKey(edge: Edge, queryParam: QueryParam, fields: Seq[String], delimiter: String = ","): Int = { - val ls = for { - field <- fields - } yield { - field match { - case "from" | "_from" => edge.srcVertex.innerId - case "to" | "_to" => edge.tgtVertex.innerId - case "label" => edge.labelWithDir.labelId - case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir)) - case "_timestamp" | "timestamp" => edge.ts - case _ => - queryParam.label.metaPropsInvMap.get(field) match { - case None => throw new RuntimeException(s"unknow column: $field") - case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match { - case None => labelMeta.defaultValue - case Some(propVal) => propVal - } - } - } - } - val ret = ls.hashCode() - ret - } - - def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], isSrcVertex: Boolean = false): Seq[Int] = { - for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - q = queryRequest.query - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - } yield toHashKey(edge, queryRequest.queryParam, q.filterOutFields) - } - - def summarizeWithListExcludeFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) - sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs, decrease = true) - } - - def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) - sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs) - } - - def summarizeWithListFormatted(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = { - val jsons = groupEdgeResult(queryRequestWithResultLs, exclude) - sortWithFormatted(jsons, queryRequestWithResultLs = queryRequestWithResultLs) - } - - def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult]): JsValue = { - toSimpleVertexArrJson(queryRequestWithResultLs, Seq.empty[QueryRequestWithResult]) - } - - private def orderBy(q: Query, - orderByColumns: Seq[(String, Boolean)], - rawEdges: ListBuffer[(Map[String, JsValue], Double, (Any, Any, Any, Any))]) - : ListBuffer[(Map[String, JsValue], Double, (Any, Any, Any, Any))] = { - import com.kakao.s2graph.core.OrderingUtil._ - - if (q.withScore && orderByColumns.nonEmpty) { - val ascendingLs = orderByColumns.map(_._2) - rawEdges.sortBy(_._3)(new TupleMultiOrdering[Any](ascendingLs)) - } else { - rawEdges - } - } - - private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, edge: Edge, column: String): Any = { - column match { - case "score" => score - case "timestamp" | "_timestamp" => edge.ts - case _ => - keyWithJs.get(column) match { - case None => keyWithJs.get("props").map { js => (js \ column).as[JsValue] }.get - case Some(x) => x - } - } - } - - def toSimpleVertexArrJson(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsValue = { - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - - val degrees = ListBuffer[JsValue]() - val rawEdges = ListBuffer[(Map[String, JsValue], Double, (Any, Any, Any, Any))]() - - if (queryRequestWithResultLs.isEmpty) { - Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr()) - } else { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get - val query = queryRequest.query - val queryParam = queryRequest.queryParam - - val orderByColumns = query.orderByColumns.filter { case (column, _) => - column match { - case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => true - case _ => - queryParam.label.metaPropNames.contains(column) - } - } - - /** build result jsons */ - for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - queryParam = queryRequest.queryParam - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, query.filterOutFields)) - } { - // edge to json - val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir) - val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType) - if (edge.isDegree && fromOpt.isDefined) { - degrees += Json.obj( - "from" -> fromOpt.get, - "label" -> queryRequest.queryParam.label.label, - "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir), - LabelMeta.degree.name -> innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG) - ) - } else { - val keyWithJs = edgeToJson(edge, score, queryRequest.query, queryRequest.queryParam) - val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match { - case 0 => - (None, None, None, None) - case 1 => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, None, None, None) - case 2 => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, v2, None, None) - case 3 => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, v2, v3, None) - case _ => - val it = orderByColumns.iterator - val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1) - val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1) - (v1, v2, v3, v4) - } - - val currentEdge = (keyWithJs, score, orderByValues) - rawEdges += currentEdge - } - } - - if (query.groupByColumns.isEmpty) { - // ordering - val edges = orderBy(query, orderByColumns, rawEdges).map(_._1) - - Json.obj( - "size" -> edges.size, - "degrees" -> degrees, - "results" -> edges, - "impressionId" -> query.impressionId() - ) - } else { - val grouped = rawEdges.groupBy { case (keyWithJs, _, _) => - val props = keyWithJs.get("props") - - for { - column <- query.groupByColumns - value <- keyWithJs.get(column) match { - case None => props.flatMap { js => (js \ column).asOpt[JsValue] } - case Some(x) => Some(x) - } - } yield column -> value - } - - val groupedEdges = { - for { - (groupByKeyVals, groupedRawEdges) <- grouped - } yield { - val scoreSum = groupedRawEdges.map(x => x._2).sum - // ordering - val edges = orderBy(query, orderByColumns, groupedRawEdges).map(_._1) - Json.obj( - "groupBy" -> Json.toJson(groupByKeyVals.toMap), - "scoreSum" -> scoreSum, - "agg" -> edges - ) - } - } - - val groupedSortedJsons = groupedEdges.toList.sortBy { jsVal => -1 * (jsVal \ "scoreSum").as[Double] } - Json.obj( - "size" -> groupedEdges.size, - "degrees" -> degrees, - "results" -> groupedSortedJsons, - "impressionId" -> query.impressionId() - ) - } - } - } - - def verticesToJson(vertices: Iterable[Vertex]) = { - Json.toJson(vertices.flatMap { v => vertexToJson(v) }) - } - - def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, JsValue] = { - for { - (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq) - labelMeta <- queryParam.label.metaPropsMap.get(seq) - jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType) - } yield labelMeta.name -> jsValue - } - - private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, queryParam: QueryParam): JsValue = { - if (parentEdges.isEmpty) { - JsNull - } else { - val parents = for { - parent <- parentEdges - (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get - parentQueryParam = QueryParam(parentEdge.labelWithDir) - parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if parents != JsNull - } yield { - val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge) - val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, parentQueryParam) + ("parents" -> parents) - Json.toJson(edgeJson) - } - - Json.toJson(parents) - } - } - - /** TODO */ - def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = { - val (srcColumn, tgtColumn) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir) - - val kvMapOpt = for { - from <- innerValToJsValue(edge.srcVertex.id.innerId, srcColumn.columnType) - to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType) - } yield { - val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else (reservedColumns & q.selectColumnsSet) + "props" - - val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ propsToJson(edge, q, queryParam) - val propsMap = if (q.selectColumnsSet.nonEmpty) _propsMap.filterKeys(q.selectColumnsSet) else _propsMap - - val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, column) => - val jsValue = column match { - case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - (System.currentTimeMillis() - queryParam.timestamp)) - case "from" => from - case "to" => to - case "label" => JsString(queryParam.label.label) - case "direction" => JsString(GraphUtil.fromDirection(edge.labelWithDir.dir)) - case "_timestamp" | "timestamp" => JsNumber(edge.ts) - case "score" => JsNumber(score) - case "props" if propsMap.nonEmpty => Json.toJson(propsMap) - case _ => JsNull - } - - if (jsValue == JsNull) map else map + (column -> jsValue) - } - kvMap - } - - kvMapOpt.getOrElse(Map.empty) - } - - def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): Map[String, JsValue] = { - val kvs = edgeToJsonInner(edge, score, q, queryParam) - if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> Json.toJson(edgeParent(edge.parentEdges, q, queryParam))) - else kvs - } - - def vertexToJson(vertex: Vertex): Option[JsObject] = { - val serviceColumn = ServiceColumn.findById(vertex.id.colId) - - for { - id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType) - } yield { - Json.obj("serviceName" -> serviceColumn.service.serviceName, - "columnName" -> serviceColumn.columnName, - "id" -> id, "props" -> propsToJson(vertex), - "timestamp" -> vertex.ts, -// "belongsTo" -> vertex.belongLabelIds) - "belongsTo" -> vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label))) - } - } - - private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, InnerValLike]) = { - for { - (seq, value) <- props - name <- seqsToNames.get(seq) - } yield (name, value) - } - - private def propsToJson(vertex: Vertex) = { - val serviceColumn = vertex.serviceColumn - val props = for { - (propKey, innerVal) <- vertex.props - columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, propKey.toByte, useCache = true) - jsValue <- innerValToJsValue(innerVal, columnMeta.dataType) - } yield { - (columnMeta.name -> jsValue) - } - props.toMap - } - - @deprecated(message = "deprecated", since = "0.2") - def propsToJson(edge: Edge) = { - for { - (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq) - metaProp <- edge.label.metaPropsMap.get(seq) - jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType) - } yield { - (metaProp.name, jsValue) - } - } - - @deprecated(message = "deprecated", since = "0.2") - def summarizeWithListExclude(queryRequestWithResultLs: Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = { - val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> true).toMap - - - val groupedEdgesWithRank = (for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, queryRequest.query.filterOutFields)) - } yield { - (edge, score) - }).groupBy { case (edge, score) => - (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId) - } - - val jsons = for { - ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank - (edges, ranks) = edgesAndRanks.groupBy(x => x._1.srcVertex).map(_._2.head).unzip - tgtId <- innerValToJsValue(target, tgtColumn.columnType) - } yield { - Json.obj(tgtColumn.columnName -> tgtId, - s"${srcColumn.columnName}s" -> - edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)), "scoreSum" -> ranks.sum) - } - val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ "scoreSum").as[Double] }.reverse - if (queryRequestWithResultLs.isEmpty) { - Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons) - } else { - Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons, - "impressionId" -> queryRequestWithResultLs.head.queryRequest.query.impressionId()) - } - - } - - -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/app/controllers/PublishController.scala ---------------------------------------------------------------------- diff --git a/app/controllers/PublishController.scala b/app/controllers/PublishController.scala deleted file mode 100644 index b1495d5..0000000 --- a/app/controllers/PublishController.scala +++ /dev/null @@ -1,54 +0,0 @@ -package controllers - -import com.kakao.s2graph.core.ExceptionHandler -import config.Config -import org.apache.kafka.clients.producer.ProducerRecord -import play.api.mvc._ - -import scala.concurrent.Future - -object PublishController extends Controller { - - import ApplicationController._ - import ExceptionHandler._ - import play.api.libs.concurrent.Execution.Implicits._ - - /** - * never check validation on string. just redirect strings to kafka. - */ - val serviceNotExistException = new RuntimeException(s"service is not created in s2graph. create service first.") - - // private def toService(topic: String): String = { - // Service.findByName(topic).map(service => s"${service.serviceName}-${Config.PHASE}").getOrElse(throw serviceNotExistException) - // } - def publishOnly(topic: String) = withHeaderAsync(parse.text) { request => - if (!Config.IS_WRITE_SERVER) Future.successful(UNAUTHORIZED) - // val kafkaTopic = toService(topic) - val strs = request.body.split("\n") - strs.foreach(str => { - val keyedMessage = new ProducerRecord[Key, Val](Config.KAFKA_LOG_TOPIC, str) - // val keyedMessage = new ProducerRecord[Key, Val](kafkaTopic, s"$str") - // logger.debug(s"$kafkaTopic, $str") - ExceptionHandler.enqueue(KafkaMessage(keyedMessage)) - }) - Future.successful( - Ok("publish success.\n").withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10") - ) - // try { - // - // } catch { - // case e: Exception => Future.successful(BadRequest(e.getMessage)) - // } - } - - def publish(topic: String) = publishOnly(topic) - - // def mutateBulk(topic: String) = Action.async(parse.text) { request => - // EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, Config.KAFKA_FAIL_TOPIC, request.body).map { result => - // result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10") - // } - // } - def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request => - EdgeController.mutateAndPublish(request.body) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/app/controllers/QueryController.scala ---------------------------------------------------------------------- diff --git a/app/controllers/QueryController.scala b/app/controllers/QueryController.scala deleted file mode 100644 index f545b89..0000000 --- a/app/controllers/QueryController.scala +++ /dev/null @@ -1,310 +0,0 @@ -package controllers - - -import com.kakao.s2graph.core.GraphExceptions.BadQueryException -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.types.{LabelWithDirection, VertexId} -import com.kakao.s2graph.core.utils.logger -import config.Config -import play.api.libs.json.{JsArray, JsObject, JsValue, Json} -import play.api.mvc.{Action, Controller, Result} - -import scala.concurrent._ -import scala.language.postfixOps -import scala.util.Try - -object QueryController extends Controller with RequestParser { - - import ApplicationController._ - import play.api.libs.concurrent.Execution.Implicits.defaultContext - - private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph - - private def badQueryExceptionResults(ex: Exception) = Future.successful(BadRequest(Json.obj("message" -> ex.getMessage)).as(applicationJsonHeader)) - - private def errorResults = Future.successful(Ok(PostProcess.timeoutResults).as(applicationJsonHeader)) - - def getEdges() = withHeaderAsync(jsonParser) { request => - getEdgesInner(request.body) - } - - def getEdgesExcluded = withHeaderAsync(jsonParser) { request => - getEdgesExcludedInner(request.body) - } - - private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = { - val filterOutQueryResultsLs = q.filterOutQuery match { - case Some(filterOutQuery) => s2.getEdges(filterOutQuery) - case None => Future.successful(Seq.empty) - } - - for { - queryResultsLs <- s2.getEdges(q) - filterOutResultsLs <- filterOutQueryResultsLs - } yield { - val json = post(queryResultsLs, filterOutResultsLs) - json - } - } - - private def calcSize(js: JsValue): Int = js match { - case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0) - case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum - case _ => 0 - } - - private def getEdgesAsync(jsonQuery: JsValue) - (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[Result] = { - if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader) - val fetch = eachQuery(post) _ -// logger.info(jsonQuery) - - Try { - val future = jsonQuery match { - case JsArray(arr) => Future.traverse(arr.map(toQuery(_)))(fetch).map(JsArray) - case obj@JsObject(_) => fetch(toQuery(obj)) - case _ => throw BadQueryException("Cannot support") - } - - future map { json => jsonResponse(json, "result_size" -> calcSize(json).toString) } - - } recover { - case e: BadQueryException => - logger.error(s"$jsonQuery, $e", e) - badQueryExceptionResults(e) - case e: Exception => - logger.error(s"$jsonQuery, $e", e) - errorResults - } get - } - - @deprecated(message = "deprecated", since = "0.2") - private def getEdgesExcludedAsync(jsonQuery: JsValue) - (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[Result] = { - - if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader) - - Try { - val q = toQuery(jsonQuery) - val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) - - val fetchFuture = s2.getEdges(q) - val excludeFuture = s2.getEdges(filterOutQuery) - - for { - queryResultLs <- fetchFuture - exclude <- excludeFuture - } yield { - val json = post(queryResultLs, exclude) - jsonResponse(json, "result_size" -> calcSize(json).toString) - } - } recover { - case e: BadQueryException => - logger.error(s"$jsonQuery, $e", e) - badQueryExceptionResults(e) - case e: Exception => - logger.error(s"$jsonQuery, $e", e) - errorResults - } get - } - - def getEdgesInner(jsonQuery: JsValue) = { - getEdgesAsync(jsonQuery)(PostProcess.toSimpleVertexArrJson) - } - - def getEdgesExcludedInner(jsValue: JsValue) = { - getEdgesExcludedAsync(jsValue)(PostProcess.toSimpleVertexArrJson) - } - - def getEdgesWithGrouping() = withHeaderAsync(jsonParser) { request => - getEdgesWithGroupingInner(request.body) - } - - def getEdgesWithGroupingInner(jsonQuery: JsValue) = { - getEdgesAsync(jsonQuery)(PostProcess.summarizeWithListFormatted) - } - - def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonParser) { request => - getEdgesExcludedWithGroupingInner(request.body) - } - - def getEdgesExcludedWithGroupingInner(jsonQuery: JsValue) = { - getEdgesExcludedAsync(jsonQuery)(PostProcess.summarizeWithListExcludeFormatted) - } - - def getEdgesGroupedInner(jsonQuery: JsValue) = { - getEdgesAsync(jsonQuery)(PostProcess.summarizeWithList) - } - - @deprecated(message = "deprecated", since = "0.2") - def getEdgesGrouped() = withHeaderAsync(jsonParser) { request => - getEdgesGroupedInner(request.body) - } - - @deprecated(message = "deprecated", since = "0.2") - def getEdgesGroupedExcluded() = withHeaderAsync(jsonParser) { request => - getEdgesGroupedExcludedInner(request.body) - } - - @deprecated(message = "deprecated", since = "0.2") - def getEdgesGroupedExcludedInner(jsonQuery: JsValue): Future[Result] = { - if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader) - - Try { - val q = toQuery(jsonQuery) - val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) - - val fetchFuture = s2.getEdges(q) - val excludeFuture = s2.getEdges(filterOutQuery) - - for { - queryResultLs <- fetchFuture - exclude <- excludeFuture - } yield { - val json = PostProcess.summarizeWithListExclude(queryResultLs, exclude) - jsonResponse(json, "result_size" -> calcSize(json).toString) - } - } recover { - case e: BadQueryException => - logger.error(s"$jsonQuery, $e", e) - badQueryExceptionResults(e) - case e: Exception => - logger.error(s"$jsonQuery, $e", e) - errorResults - } get - } - - @deprecated(message = "deprecated", since = "0.2") - def getEdgesGroupedExcludedFormatted = withHeaderAsync(jsonParser) { request => - getEdgesGroupedExcludedFormattedInner(request.body) - } - - @deprecated(message = "deprecated", since = "0.2") - def getEdgesGroupedExcludedFormattedInner(jsonQuery: JsValue): Future[Result] = { - if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader) - - Try { - val q = toQuery(jsonQuery) - val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) - - val fetchFuture = s2.getEdges(q) - val excludeFuture = s2.getEdges(filterOutQuery) - - for { - queryResultLs <- fetchFuture - exclude <- excludeFuture - } yield { - val json = PostProcess.summarizeWithListExcludeFormatted(queryResultLs, exclude) - jsonResponse(json, "result_size" -> calcSize(json).toString) - } - } recover { - case e: BadQueryException => - logger.error(s"$jsonQuery, $e", e) - badQueryExceptionResults(e) - case e: Exception => - logger.error(s"$jsonQuery, $e", e) - errorResults - } get - } - - def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) = Action.async { request => - if (!Config.IS_QUERY_SERVER) Future.successful(Unauthorized) - val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) - checkEdgesInner(params) - } - - /** - * Vertex - */ - - def checkEdgesInner(jsValue: JsValue) = { - try { - val params = jsValue.as[List[JsValue]] - var isReverted = false - val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]() - val quads = for { - param <- params - labelName <- (param \ "label").asOpt[String] - direction <- GraphUtil.toDir((param \ "direction").asOpt[String].getOrElse("out")) - label <- Label.findByName(labelName) - srcId <- jsValueToInnerVal((param \ "from").as[JsValue], label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion) - tgtId <- jsValueToInnerVal((param \ "to").as[JsValue], label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion) - } yield { - val labelWithDir = LabelWithDirection(label.id.get, direction) - labelWithDirs += labelWithDir - val (src, tgt, dir) = if (direction == 1) { - isReverted = true - (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), - Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), 0) - } else { - (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), - Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), 0) - } - - // logger.debug(s"SrcVertex: $src") - // logger.debug(s"TgtVertex: $tgt") - // logger.debug(s"direction: $dir") - (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir))) - } - - s2.checkEdges(quads).map { case queryRequestWithResultLs => - val edgeJsons = for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - convertedEdge = if (isReverted) edge.duplicateEdge else edge - edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam) - } yield Json.toJson(edgeJson) - - val json = Json.toJson(edgeJsons) - jsonResponse(json, "result_size" -> edgeJsons.size.toString) - } - } catch { - case e: Exception => - logger.error(s"$jsValue, $e", e) - errorResults - } - } - - def checkEdges() = withHeaderAsync(jsonParser) { request => - if (!Config.IS_QUERY_SERVER) Future.successful(Unauthorized) - - checkEdgesInner(request.body) - } - - def getVertices() = withHeaderAsync(jsonParser) { request => - getVerticesInner(request.body) - } - - def getVerticesInner(jsValue: JsValue) = { - if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader) - - val jsonQuery = jsValue - val ts = System.currentTimeMillis() - val props = "{}" - - Try { - val vertices = jsonQuery.as[List[JsValue]].flatMap { js => - val serviceName = (js \ "serviceName").as[String] - val columnName = (js \ "columnName").as[String] - for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield { - Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props) - } - } - - s2.getVertices(vertices) map { vertices => - val json = PostProcess.verticesToJson(vertices) - jsonResponse(json, "result_size" -> calcSize(json).toString) - } - } recover { - case e: play.api.libs.json.JsResultException => - logger.error(s"$jsonQuery, $e", e) - badQueryExceptionResults(e) - case e: Exception => - logger.error(s"$jsonQuery, $e", e) - errorResults - } get - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/app/controllers/RequestParser.scala ---------------------------------------------------------------------- diff --git a/app/controllers/RequestParser.scala b/app/controllers/RequestParser.scala deleted file mode 100644 index e2ac8d6..0000000 --- a/app/controllers/RequestParser.scala +++ /dev/null @@ -1,460 +0,0 @@ -package controllers - -import com.kakao.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException} -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.parsers.WhereParser -import com.kakao.s2graph.core.types._ -import config.Config -import play.Play -import play.api.libs.json._ - -import scala.util.{Failure, Success, Try} - - -trait RequestParser extends JSONParser { - - import Management.JsonModel._ - - val hardLimit = Config.QUERY_HARD_LIMIT - val defaultLimit = 100 - - lazy val defaultCluster = Play.application().configuration().getString("hbase.zookeeper.quorum") - lazy val defaultCompressionAlgorithm = Play.application().configuration.getString("hbase.table.compression.algorithm") - - private def extractScoring(labelId: Int, value: JsValue) = { - val ret = for { - js <- parse[Option[JsObject]](value, "scoring") - } yield { - for { - (k, v) <- js.fields - labelOrderType <- LabelMeta.findByName(labelId, k) - } yield { - val value = v match { - case n: JsNumber => n.as[Double] - case _ => throw new Exception("scoring weight should be double.") - } - (labelOrderType.seq, value) - } - } - ret - } - - def extractInterval(label: Label, jsValue: JsValue) = { - def extractKv(js: JsValue) = js match { - case JsObject(obj) => obj - case JsArray(arr) => arr.flatMap { - case JsObject(obj) => obj - case _ => throw new RuntimeException(s"cannot support json type $js") - } - case _ => throw new RuntimeException(s"cannot support json type: $js") - } - - val ret = for { - js <- parse[Option[JsObject]](jsValue, "interval") - fromJs <- (js \ "from").asOpt[JsValue] - toJs <- (js \ "to").asOpt[JsValue] - } yield { - val from = Management.toProps(label, extractKv(fromJs)) - val to = Management.toProps(label, extractKv(toJs)) - (from, to) - } - - ret - } - - def extractDuration(label: Label, jsValue: JsValue) = { - for { - js <- parse[Option[JsObject]](jsValue, "duration") - } yield { - val minTs = parse[Option[Long]](js, "from").getOrElse(Long.MaxValue) - val maxTs = parse[Option[Long]](js, "to").getOrElse(Long.MinValue) - (minTs, maxTs) - } - } - - def extractHas(label: Label, jsValue: JsValue) = { - val ret = for { - js <- parse[Option[JsObject]](jsValue, "has") - } yield { - for { - (k, v) <- js.fields - labelMeta <- LabelMeta.findByName(label.id.get, k) - value <- jsValueToInnerVal(v, labelMeta.dataType, label.schemaVersion) - } yield { - labelMeta.seq -> value - } - } - ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike]) - } - - def extractWhere(labelMap: Map[String, Label], jsValue: JsValue) = { - (jsValue \ "where").asOpt[String] match { - case None => Success(WhereParser.success) - case Some(where) => - WhereParser(labelMap).parse(where) match { - case s@Success(_) => s - case Failure(ex) => throw BadQueryException(ex.getMessage, ex) - } - } - } - - def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[Vertex] = { - val vertices = for { - label <- Label.findByName(labelName).toSeq - serviceColumn = if (direction == "out") label.srcColumn else label.tgtColumn - id <- ids - innerId <- jsValueToInnerVal(id, serviceColumn.columnType, label.schemaVersion) - } yield { - Vertex(SourceVertexId(serviceColumn.id.get, innerId), System.currentTimeMillis()) - } - vertices.toSeq - } - - def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = { - try { - val vertices = - (for { - value <- parse[List[JsValue]](jsValue, "srcVertices") - serviceName = parse[String](value, "serviceName") - column = parse[String](value, "columnName") - } yield { - val service = Service.findByName(serviceName).getOrElse(throw BadQueryException("service not found")) - val col = ServiceColumn.find(service.id.get, column).getOrElse(throw BadQueryException("bad column name")) - val (idOpt, idsOpt) = ((value \ "id").asOpt[JsValue], (value \ "ids").asOpt[List[JsValue]]) - for { - idVal <- idOpt ++ idsOpt.toSeq.flatten - - /** bug, need to use labels schemaVersion */ - innerVal <- jsValueToInnerVal(idVal, col.columnType, col.schemaVersion) - } yield { - Vertex(SourceVertexId(col.id.get, innerVal), System.currentTimeMillis()) - } - }).flatten - - if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty") - - val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name)) - val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v) }.map { q => q.copy(filterOutFields = filterOutFields) } - val steps = parse[Vector[JsValue]](jsValue, "steps") - val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true) - val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty) - val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty) - val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs => - for { - js <- jsLs - (column, orderJs) <- js.fields - } yield { - val ascending = orderJs.as[String].toUpperCase match { - case "ASC" => true - case "DESC" => false - } - column -> ascending - } - }.getOrElse(List("score" -> false, "timestamp" -> false)) - val withScore = (jsValue \ "withScore").asOpt[Boolean].getOrElse(true) - val returnTree = (jsValue \ "returnTree").asOpt[Boolean].getOrElse(false) - - // TODO: throw exception, when label dosn't exist - val labelMap = (for { - js <- jsValue \\ "label" - labelName <- js.asOpt[String] - label <- Label.findByName(labelName) - } yield (labelName, label)).toMap - - val querySteps = - steps.zipWithIndex.map { case (step, stepIdx) => - val labelWeights = step match { - case obj: JsObject => - val converted = for { - (k, v) <- (obj \ "weights").asOpt[JsObject].getOrElse(Json.obj()).fields - l <- Label.findByName(k) - } yield { - l.id.get -> v.toString().toDouble - } - converted.toMap - case _ => Map.empty[Int, Double] - } - val queryParamJsVals = step match { - case arr: JsArray => arr.as[List[JsValue]] - case obj: JsObject => (obj \ "step").as[List[JsValue]] - case _ => List.empty[JsValue] - } - val nextStepScoreThreshold = step match { - case obj: JsObject => (obj \ "nextStepThreshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold) - case _ => QueryParam.DefaultThreshold - } - val nextStepLimit = step match { - case obj: JsObject => (obj \ "nextStepLimit").asOpt[Int].getOrElse(-1) - case _ => -1 - } - val cacheTTL = step match { - case obj: JsObject => (obj \ "cacheTTL").asOpt[Int].getOrElse(-1) - case _ => -1 - } - val queryParams = - for { - labelGroup <- queryParamJsVals - queryParam <- parseQueryParam(labelMap, labelGroup) - } yield { - val (_, columnName) = - if (queryParam.labelWithDir.dir == GraphUtil.directions("out")) { - (queryParam.label.srcService.serviceName, queryParam.label.srcColumnName) - } else { - (queryParam.label.tgtService.serviceName, queryParam.label.tgtColumnName) - } - //FIXME: - if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => v.serviceColumn.columnName == columnName)) { - throw BadQueryException("srcVertices contains incompatiable serviceName or columnName with first step.") - } - - queryParam - } - Step(queryParams.toList, labelWeights = labelWeights, - // scoreThreshold = stepThreshold, - nextStepScoreThreshold = nextStepScoreThreshold, - nextStepLimit = nextStepLimit, - cacheTTL = cacheTTL) - - } - - val ret = Query( - vertices, - querySteps, - removeCycle = removeCycle, - selectColumns = selectColumns, - groupByColumns = groupByColumns, - orderByColumns = orderByColumns, - filterOutQuery = filterOutQuery, - filterOutFields = filterOutFields, - withScore = withScore, - returnTree = returnTree - ) - // logger.debug(ret.toString) - ret - } catch { - case e: BadQueryException => - throw e - case e: ModelNotFoundException => - throw BadQueryException(e.getMessage, e) - case e: Exception => - throw BadQueryException(s"$jsValue, $e", e) - } - } - - private def parseQueryParam(labelMap: Map[String, Label], labelGroup: JsValue): Option[QueryParam] = { - for { - labelName <- parse[Option[String]](labelGroup, "label") - } yield { - val label = Label.findByName(labelName).getOrElse(throw BadQueryException(s"$labelName not found")) - val direction = parse[Option[String]](labelGroup, "direction").map(GraphUtil.toDirection(_)).getOrElse(0) - val limit = { - parse[Option[Int]](labelGroup, "limit") match { - case None => defaultLimit - case Some(l) if l < 0 => Int.MaxValue - case Some(l) if l >= 0 => - val default = hardLimit - Math.min(l, default) - } - } - val offset = parse[Option[Int]](labelGroup, "offset").getOrElse(0) - val interval = extractInterval(label, labelGroup) - val duration = extractDuration(label, labelGroup) - val scoring = extractScoring(label.id.get, labelGroup).getOrElse(List.empty[(Byte, Double)]).toList - val exclude = parse[Option[Boolean]](labelGroup, "exclude").getOrElse(false) - val include = parse[Option[Boolean]](labelGroup, "include").getOrElse(false) - val hasFilter = extractHas(label, labelGroup) - val labelWithDir = LabelWithDirection(label.id.get, direction) - val indexNameOpt = (labelGroup \ "index").asOpt[String] - val indexSeq = indexNameOpt match { - case None => label.indexSeqsMap.get(scoring.map(kv => kv._1)).map(_.seq).getOrElse(LabelIndex.DefaultSeq) - case Some(indexName) => label.indexNameMap.get(indexName).map(_.seq).getOrElse(throw new RuntimeException("cannot find index")) - } - val where = extractWhere(labelMap, labelGroup) - val includeDegree = (labelGroup \ "includeDegree").asOpt[Boolean].getOrElse(true) - val rpcTimeout = (labelGroup \ "rpcTimeout").asOpt[Int].getOrElse(Config.RPC_TIMEOUT) - val maxAttempt = (labelGroup \ "maxAttempt").asOpt[Int].getOrElse(Config.MAX_ATTEMPT) - val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].flatMap { jsVal => - jsValueToInnerVal(jsVal, label.tgtColumnWithDir(direction).columnType, label.schemaVersion) - } - val cacheTTL = (labelGroup \ "cacheTTL").asOpt[Long].getOrElse(-1L) - val timeDecayFactor = (labelGroup \ "timeDecay").asOpt[JsObject].map { jsVal => - val initial = (jsVal \ "initial").asOpt[Double].getOrElse(1.0) - val decayRate = (jsVal \ "decayRate").asOpt[Double].getOrElse(0.1) - if (decayRate >= 1.0 || decayRate <= 0.0) throw new BadQueryException("decay rate should be 0.0 ~ 1.0") - val timeUnit = (jsVal \ "timeUnit").asOpt[Double].getOrElse(60 * 60 * 24.0) - TimeDecay(initial, decayRate, timeUnit) - } - val threshold = (labelGroup \ "threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold) - // TODO: refactor this. dirty - val duplicate = parse[Option[String]](labelGroup, "duplicate").map(s => Query.DuplicatePolicy(s)) - - val outputField = (labelGroup \ "outputField").asOpt[String].map(s => Json.arr(Json.arr(s))) - val transformer = if (outputField.isDefined) outputField else (labelGroup \ "transform").asOpt[JsValue] - val scorePropagateOp = (labelGroup \ "scorePropagateOp").asOpt[String].getOrElse("multiply") - val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1) - - // FIXME: Order of command matter - QueryParam(labelWithDir) - .sample(sample) - .limit(offset, limit) - .rank(RankParam(label.id.get, scoring)) - .exclude(exclude) - .include(include) - .interval(interval) - .duration(duration) - .has(hasFilter) - .labelOrderSeq(indexSeq) - .where(where) - .duplicatePolicy(duplicate) - .includeDegree(includeDegree) - .rpcTimeout(rpcTimeout) - .maxAttempt(maxAttempt) - .tgtVertexInnerIdOpt(tgtVertexInnerIdOpt) - .cacheTTLInMillis(cacheTTL) - .timeDecay(timeDecayFactor) - .threshold(threshold) - .transformer(transformer) - .scorePropagateOp(scorePropagateOp) - } - } - - private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = { - (js \ key).validate[R] - .fold( - errors => { - val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].map(x => x \ "msg") - val e = Json.obj("args" -> key, "error" -> msg) - throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString) - }, - r => { - r - }) - } - - def toJsValues(jsValue: JsValue): List[JsValue] = { - jsValue match { - case obj: JsObject => List(obj) - case arr: JsArray => arr.as[List[JsValue]] - case _ => List.empty[JsValue] - } - - } - - def toEdges(jsValue: JsValue, operation: String): List[Edge] = { - toJsValues(jsValue).map(toEdge(_, operation)) - } - - def toEdge(jsValue: JsValue, operation: String) = { - - val srcId = parse[JsValue](jsValue, "from") match { - case s: JsString => s.as[String] - case o@_ => s"${o}" - } - val tgtId = parse[JsValue](jsValue, "to") match { - case s: JsString => s.as[String] - case o@_ => s"${o}" - } - val label = parse[String](jsValue, "label") - val timestamp = parse[Long](jsValue, "timestamp") - val direction = parse[Option[String]](jsValue, "direction").getOrElse("") - val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}") - Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString) - - } - - def toVertices(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None) = { - toJsValues(jsValue).map(toVertex(_, operation, serviceName, columnName)) - } - - def toVertex(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None): Vertex = { - val id = parse[JsValue](jsValue, "id") - val ts = parse[Option[Long]](jsValue, "timestamp").getOrElse(System.currentTimeMillis()) - val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get - val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get - val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()) - Management.toVertex(ts, operation, id.toString, sName, cName, props.toString) - } - - def toPropElements(jsObj: JsValue) = Try { - val propName = (jsObj \ "name").as[String] - val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String]) - val defaultValue = (jsObj \ "defaultValue").as[JsValue] match { - case JsString(s) => s - case _@js => js.toString - } - Prop(propName, defaultValue, dataType) - } - - def toPropsElements(jsValue: JsValue): Seq[Prop] = for { - jsObj <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil) - } yield { - val propName = (jsObj \ "name").as[String] - val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String]) - val defaultValue = (jsObj \ "defaultValue").as[JsValue] match { - case JsString(s) => s - case _@js => js.toString - } - Prop(propName, defaultValue, dataType) - } - - def toIndicesElements(jsValue: JsValue): Seq[Index] = for { - jsObj <- jsValue.as[Seq[JsValue]] - indexName = (jsObj \ "name").as[String] - propNames = (jsObj \ "propNames").as[Seq[String]] - } yield Index(indexName, propNames) - - def toLabelElements(jsValue: JsValue) = Try { - val labelName = parse[String](jsValue, "label") - val srcServiceName = parse[String](jsValue, "srcServiceName") - val tgtServiceName = parse[String](jsValue, "tgtServiceName") - val srcColumnName = parse[String](jsValue, "srcColumnName") - val tgtColumnName = parse[String](jsValue, "tgtColumnName") - val srcColumnType = parse[String](jsValue, "srcColumnType") - val tgtColumnType = parse[String](jsValue, "tgtColumnType") - val serviceName = (jsValue \ "serviceName").asOpt[String].getOrElse(tgtServiceName) - val isDirected = (jsValue \ "isDirected").asOpt[Boolean].getOrElse(true) - - val allProps = toPropsElements(jsValue \ "props") - val indices = toIndicesElements(jsValue \ "indices") - - val consistencyLevel = (jsValue \ "consistencyLevel").asOpt[String].getOrElse("weak") - - // expect new label don`t provide hTableName - val hTableName = (jsValue \ "hTableName").asOpt[String] - val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int] - val schemaVersion = (jsValue \ "schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION) - val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false) - val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(defaultCompressionAlgorithm) - - (labelName, srcServiceName, srcColumnName, srcColumnType, - tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName, - indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm) - } - - def toIndexElements(jsValue: JsValue) = Try { - val labelName = parse[String](jsValue, "label") - val indices = toIndicesElements(jsValue \ "indices") - (labelName, indices) - } - - def toServiceElements(jsValue: JsValue) = { - val serviceName = parse[String](jsValue, "serviceName") - val cluster = (jsValue \ "cluster").asOpt[String].getOrElse(defaultCluster) - val hTableName = (jsValue \ "hTableName").asOpt[String].getOrElse(s"${serviceName}-${Config.PHASE}") - val preSplitSize = (jsValue \ "preSplitSize").asOpt[Int].getOrElse(1) - val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int] - val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(defaultCompressionAlgorithm) - (serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) - } - - def toServiceColumnElements(jsValue: JsValue) = Try { - val serviceName = parse[String](jsValue, "serviceName") - val columnName = parse[String](jsValue, "columnName") - val columnType = parse[String](jsValue, "columnType") - val props = toPropsElements(jsValue \ "props") - (serviceName, columnName, columnType, props) - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/app/controllers/TestController.scala ---------------------------------------------------------------------- diff --git a/app/controllers/TestController.scala b/app/controllers/TestController.scala deleted file mode 100644 index 8558ae5..0000000 --- a/app/controllers/TestController.scala +++ /dev/null @@ -1,24 +0,0 @@ -package controllers - -import play.api.mvc.{Action, Controller} -import util.TestDataLoader - -import scala.concurrent.Future - - -object TestController extends Controller { - import ApplicationController._ - - def getRandomId() = withHeader(parse.anyContent) { request => - val id = TestDataLoader.randomId - Ok(s"${id}") - } - - def pingAsync() = Action.async(parse.json) { request => - Future.successful(Ok("Pong\n")) - } - - def ping() = Action(parse.json) { request => - Ok("Pong\n") - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/app/controllers/VertexController.scala ---------------------------------------------------------------------- diff --git a/app/controllers/VertexController.scala b/app/controllers/VertexController.scala deleted file mode 100644 index 9c6de20..0000000 --- a/app/controllers/VertexController.scala +++ /dev/null @@ -1,84 +0,0 @@ -package controllers - - -import actors.QueueActor -import com.kakao.s2graph.core.utils.logger -import com.kakao.s2graph.core.{ExceptionHandler, Graph, GraphExceptions} -import config.Config -import play.api.libs.json.{JsValue, Json} -import play.api.mvc.{Controller, Result} - -import scala.concurrent.Future - -object VertexController extends Controller with RequestParser { - private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph - - import ExceptionHandler._ - import controllers.ApplicationController._ - import play.api.libs.concurrent.Execution.Implicits._ - - def tryMutates(jsValue: JsValue, operation: String, serviceNameOpt: Option[String] = None, columnNameOpt: Option[String] = None, withWait: Boolean = false): Future[Result] = { - if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) - else { - try { - val vertices = toVertices(jsValue, operation, serviceNameOpt, columnNameOpt) - - for (vertex <- vertices) { - if (vertex.isAsync) - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, vertex, None)) - else - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, vertex, None)) - } - - //FIXME: - val verticesToStore = vertices.filterNot(v => v.isAsync) - - if (withWait) { - val rets = s2.mutateVertices(verticesToStore, withWait = true) - rets.map(Json.toJson(_)).map(jsonResponse(_)) - } else { - val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true } - Future.successful(jsonResponse(Json.toJson(rets))) - } - } catch { - case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"e")) - case e: Exception => - logger.error(s"[Failed] tryMutates", e) - Future.successful(InternalServerError(s"${e.getStackTrace}")) - } - } - } - - def inserts() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insert") - } - - def insertsWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insert", withWait = true) - } - - def insertsSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insert", Some(serviceName), Some(columnName)) - } - - def deletes() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "delete") - } - - def deletesWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "delete", withWait = true) - } - - def deletesSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "delete", Some(serviceName), Some(columnName)) - } - - def deletesAll() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "deleteAll") - } - - def deletesAllSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "deleteAll", Some(serviceName), Some(columnName)) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/app/models/ExactCounterItem.scala ---------------------------------------------------------------------- diff --git a/app/models/ExactCounterItem.scala b/app/models/ExactCounterItem.scala deleted file mode 100644 index 244c046..0000000 --- a/app/models/ExactCounterItem.scala +++ /dev/null @@ -1,38 +0,0 @@ -package models - -import play.api.libs.json.{Json, Writes} - -/** - * Created by alec on 15. 4. 15.. - */ -case class ExactCounterItem(ts: Long, count: Long, score: Double) - -case class ExactCounterIntervalItem(interval: String, dimension: Map[String, String], counter: Seq[ExactCounterItem]) - -case class ExactCounterResultMeta(service: String, action: String, item: String) - -case class ExactCounterResult(meta: ExactCounterResultMeta, data: Seq[ExactCounterIntervalItem]) - -object ExactCounterItem { - implicit val writes = new Writes[ExactCounterItem] { - def writes(item: ExactCounterItem) = Json.obj( - "ts" -> item.ts, - "time" -> tsFormat.format(item.ts), - "count" -> item.count, - "score" -> item.score - ) - } - implicit val reads = Json.reads[ExactCounterItem] -} - -object ExactCounterIntervalItem { - implicit val format = Json.format[ExactCounterIntervalItem] -} - -object ExactCounterResultMeta { - implicit val format = Json.format[ExactCounterResultMeta] -} - -object ExactCounterResult { - implicit val formats = Json.format[ExactCounterResult] -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/app/models/RankCounterItem.scala ---------------------------------------------------------------------- diff --git a/app/models/RankCounterItem.scala b/app/models/RankCounterItem.scala deleted file mode 100644 index aaa7df7..0000000 --- a/app/models/RankCounterItem.scala +++ /dev/null @@ -1,40 +0,0 @@ -package models - -import play.api.libs.json.{Json, Writes} - -/** - * Created by alec on 15. 4. 15.. - */ -case class RankCounterItem(rank: Int, id: String, score: Double) - -case class RankCounterDimensionItem(interval: String, ts: Long, dimension: String, total: Double, ranks: Seq[RankCounterItem]) - -case class RankCounterResultMeta(service: String, action: String) - -case class RankCounterResult(meta: RankCounterResultMeta, data: Seq[RankCounterDimensionItem]) - -object RankCounterItem { - implicit val format = Json.format[RankCounterItem] -} - -object RankCounterDimensionItem { - implicit val writes = new Writes[RankCounterDimensionItem] { - def writes(item: RankCounterDimensionItem) = Json.obj( - "interval" -> item.interval, - "ts" -> item.ts, - "time" -> tsFormat.format(item.ts), - "dimension" -> item.dimension, - "total" -> item.total, - "ranks" -> item.ranks - ) - } - implicit val reads = Json.reads[RankCounterDimensionItem] -} - -object RankCounterResultMeta { - implicit val format = Json.format[RankCounterResultMeta] -} - -object RankCounterResult { - implicit val format = Json.format[RankCounterResult] -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/app/models/package.scala ---------------------------------------------------------------------- diff --git a/app/models/package.scala b/app/models/package.scala deleted file mode 100644 index 17fa8e1..0000000 --- a/app/models/package.scala +++ /dev/null @@ -1,8 +0,0 @@ -import java.text.SimpleDateFormat - -/** - * Created by alec on 15. 4. 20.. - */ -package object models { - def tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/app/util/TestDataLoader.scala ---------------------------------------------------------------------- diff --git a/app/util/TestDataLoader.scala b/app/util/TestDataLoader.scala deleted file mode 100644 index 45a9b61..0000000 --- a/app/util/TestDataLoader.scala +++ /dev/null @@ -1,70 +0,0 @@ -package util - -import java.io.File - -import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer} -import scala.io.Source -import scala.util.Random - - -object TestDataLoader { - val step = 100 - val prob = 1.0 - val (testIds, testIdsHist, testIdsHistCnt) = loadSeeds("./talk_vertices.txt") - val maxId = testIds.length - // val randoms = (0 until 100).map{ i => new SecureRandom } - // val idx = new AtomicInteger(0) - // def randomId() = { - // val r = randoms(idx.getAndIncrement() % randoms.size) - // testAccountIds(r.nextInt(maxId)) - // } - def randomId(histStep: Int) = { - for { - maxId <- testIdsHistCnt.get(histStep) - rIdx = Random.nextInt(maxId.toInt) - hist <- testIdsHist.get(histStep) - id = hist(rIdx) - } yield { -// logger.debug(s"randomId: $histStep = $id[$rIdx / $maxId]") - id - } - } - def randomId() = { - val id = testIds(Random.nextInt(maxId)) - // logger.debug(s"$id") - id - } - private def loadSeeds(filePath: String) = { - val histogram = new HashMap[Long, ListBuffer[Long]] - val histogramCnt = new HashMap[Long, Long] - val ids = new ArrayBuffer[Long] - - var idx = 0 -// logger.debug(s"$filePath start to load file.") - for (line <- Source.fromFile(new File(filePath)).getLines) { - // testAccountIds(idx) = line.toLong -// if (idx % 10000 == 0) logger.debug(s"$idx") - idx += 1 - - val parts = line.split("\\t") - val id = parts.head.toLong - val count = parts.last.toLong / step - if (count > 1 && Random.nextDouble < prob) { - histogram.get(count) match { - case None => - histogram.put(count, new ListBuffer[Long]) - histogram.get(count).get += id - histogramCnt.put(count, 1) - case Some(existed) => - existed += id - histogramCnt.put(count, histogramCnt.getOrElse(count, 0L) + 1L) - } - ids += id - } - - } -// logger.debug(s"upload $filePath finished.") -// logger.debug(s"${histogram.size}") - (ids, histogram.map(t => (t._1 -> t._2.toArray[Long])), histogramCnt) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index 03acb37..30e5b64 100755 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ lazy val commonSettings = Seq( scalaVersion := "2.11.7", version := "0.12.1-SNAPSHOT", scalacOptions := Seq("-language:postfixOps", "-unchecked", "-deprecation", "-feature", "-Xlint"), - javaOptions ++= collection.JavaConversions.propertiesAsScalaMap(System.getProperties).map{ case (key, value) => "-D" + key + "=" + value }.toSeq, + javaOptions ++= collection.JavaConversions.propertiesAsScalaMap(System.getProperties).map { case (key, value) => "-D" + key + "=" + value }.toSeq, testOptions in Test += Tests.Argument("-oDF"), parallelExecution in Test := false, resolvers ++= Seq( @@ -14,13 +14,17 @@ lazy val commonSettings = Seq( "Cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos", "Twitter Maven" at "http://maven.twttr.com", "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", - "Scalaz Bintray Repo" at "http://dl.bintray.com/scalaz/releases" + "Scalaz Bintray Repo" at "http://dl.bintray.com/scalaz/releases", + "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/" ) ) -lazy val root = project.in(file(".")).enablePlugins(PlayScala) +Revolver.settings + +lazy val s2rest_play = project.enablePlugins(PlayScala) .dependsOn(s2core, s2counter_core) .settings(commonSettings: _*) + .settings(testOptions in Test += Tests.Argument("sequential")) lazy val s2core = project.settings(commonSettings: _*) @@ -35,17 +39,12 @@ lazy val s2counter_core = project.dependsOn(s2core) lazy val s2counter_loader = project.dependsOn(s2counter_core, spark) .settings(commonSettings: _*) -libraryDependencies ++= Seq( - ws, - filters, - "xalan" % "serializer" % "2.7.2", // Download in Intelli J(Download Source/Document) - "com.github.danielwegener" % "logback-kafka-appender" % "0.0.3", - "org.json4s" %% "json4s-native" % "3.2.11" % Test -) - +lazy val root = (project in file(".")) + .aggregate(s2core, s2rest_play) + .settings(commonSettings: _*) lazy val runRatTask = taskKey[Unit]("Runs Apache rat on S2Graph") runRatTask := { "sh bin/run-rat.sh" ! -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/conf/logger.xml ---------------------------------------------------------------------- diff --git a/conf/logger.xml b/conf/logger.xml deleted file mode 100644 index 2d767c2..0000000 --- a/conf/logger.xml +++ /dev/null @@ -1,83 +0,0 @@ -<configuration> - - <conversionRule conversionWord="coloredLevel" converterClass="play.api.Logger$ColoredLevel"/> - - <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>logs/application.log</file> - - <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy"> - <fileNamePattern>logs/application.%i.log</fileNamePattern> - <minIndex>1</minIndex> - <maxIndex>9</maxIndex> - </rollingPolicy> - - <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> - <maxFileSize>500MB</maxFileSize> - </triggeringPolicy> - <encoder> - <pattern>%date [%level] [%logger] [%thread] - %message %xException%n</pattern> - </encoder> - </appender> - - - <appender name="ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>logs/error.log</file> - <append>true</append> - <encoder> - <pattern>%date [%level] [%logger] [%thread] - %message %xException%n</pattern> - </encoder> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>logs/error.log.%d.%i</fileNamePattern> - <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> - <maxFileSize>500MB</maxFileSize> - </timeBasedFileNamingAndTriggeringPolicy> - <maxHistory>3</maxHistory> - </rollingPolicy> - </appender> - - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%coloredLevel %logger{15} - %message%n%xException%n</pattern> - </encoder> - </appender> - - <appender name="ACTOR" class="ch.qos.logback.core.rolling.RollingFileAppender"> - <file>logs/actor.log</file> - <append>true</append> - <encoder> - <pattern>%date [%level] [%logger] [%thread] - %message %xException%n</pattern> - </encoder> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>logs/actor.log.%d.%i</fileNamePattern> - <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> - <maxFileSize>200MB</maxFileSize> - </timeBasedFileNamingAndTriggeringPolicy> - <maxHistory>7</maxHistory> - </rollingPolicy> - </appender> - - <logger name="play" level="INFO"> - <appender-ref ref="STDOUT"/> - <appender-ref ref="FILE"/> - </logger> - - <logger name="application" level="INFO"> - <appender-ref ref="STDOUT"/> - <appender-ref ref="FILE"/> - </logger> - - <logger name="error" level="INFO"> - <appender-ref ref="STDOUT"/> - <appender-ref ref="ERROR"/> - </logger> - - <logger name="actor" level="INFO"> - <appender-ref ref="ACTOR"/> - </logger> - - <logger name="akka" level="INFO"> - <appender-ref ref="STDOUT"/> - <appender-ref ref="FILE"/> - </logger> - -</configuration> http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/conf/reference.conf ---------------------------------------------------------------------- diff --git a/conf/reference.conf b/conf/reference.conf deleted file mode 100644 index d13be52..0000000 --- a/conf/reference.conf +++ /dev/null @@ -1,129 +0,0 @@ -# This is the main configuration file for the application. -# ~~~~~ - -# Secret key -# ~~~~~ -# The secret key is used to secure cryptographics functions. -# -# This must be changed for production, but we recommend not changing it in this file. -# -# See http://www.playframework.com/documentation/latest/ApplicationSecret for more details. -application.secret="/`==g^yr2DNnZGK_L^rguLZeR`60uLOVgY@OhyTv:maatl:Tl>9or/d1xME3b/Pi" - -# The application languages -# ~~~~~ -application.langs="en" - -# Global object class -# ~~~~~ -# Define the Global object class for this application. -# Default to Global in the root package. -# application.global=Global - -# Router -# ~~~~~ -# Define the Router object to use for this application. -# This router will be looked up first when the application is starting up, -# so make sure this is the entry point. -# Furthermore, it's assumed your route file is named properly. -# So for an application router like `my.application.Router`, -# you may need to define a router file `conf/my.application.routes`. -# Default to Routes in the root package (and conf/routes) -# application.router=my.application.Routes - -# Database configuration -# ~~~~~ -# You can declare as many datasources as you want. -# By convention, the default datasource is named `default` -# -# db.default.driver=org.h2.Driver -# db.default.url="jdbc:h2:mem:play" -# db.default.user=sa -# db.default.password="" - -# Evolutions -# ~~~~~ -# You can disable evolutions if needed -# evolutionplugin=disabled - -# Logger -# ~~~~~ -# You can also configure logback (http://logback.qos.ch/), -# by providing an application-logger.xml file in the conf directory. - -# Root logger: -logger.root=ERROR - -# Logger used by the framework: -logger.play=INFO - -# Logger provided to your application: -logger.application=DEBUG - -# APP PHASE -phase=dev -host=localhost - -# DB -s2graph.models.table.name="models-dev" -hbase.zookeeper.quorum=${host} -db.default.url="jdbc:mysql://"${host}":3306/graph_dev" -# Query server -is.query.server=true -is.write.server=true -query.hard.limit=100000 - -# Local Cache -cache.ttl.seconds=60 -cache.max.size=100000 - -# HBASE -#hbase.client.operation.timeout=1000 -#async.hbase.client.flush.interval=100 -hbase.table.compression.algorithm="gz" - -# Asynchbase -hbase.client.retries.number=100 -hbase.rpcs.buffered_flush_interval=100 -hbase.rpc.timeout=0 -#hbase.nsre.high_watermark=1000000 -#hbase.timer.tick=5 -#hbase.timer.ticks_per_wheel=5 - -# Kafka -kafka.metadata.broker.list=${host} -kafka.producer.pool.size=0 - -# HTTP -parsers.text.maxLength=512K -parsers.json.maxLength=512K -trustxforwarded=false - -# Local Queue Actor -local.queue.actor.max.queue.size=100000 -local.queue.actor.rate.limit=1000000 - -# local retry number -max.retry.number=100 -max.back.off=50 -delete.all.fetch.size=10000 -hbase.fail.prob=-1.0 - -# max allowd edges for deleteAll is multiply of above two configuration. - -# set global obejct package, TODO: remove global -application.global=com.kakao.s2graph.rest.Global - -akka { - loggers = ["akka.event.slf4j.Slf4jLogger"] - loglevel = "DEBUG" -} - - -# Future cache. -future.cache.max.size=1000000 -future.cache.max.idle.ttl=10000 - -# Counter -redis.instances = [${host}] - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/conf/routes ---------------------------------------------------------------------- diff --git a/conf/routes b/conf/routes deleted file mode 100644 index df4a1ee..0000000 --- a/conf/routes +++ /dev/null @@ -1,124 +0,0 @@ -# Routes -# This file defines all application routes (Higher priority routes first) -# ~~~~ - - -# publish -#POST /publish/:topic controllers.PublishController.publish(topic) -POST /publish/:topic controllers.PublishController.mutateBulk(topic) -POST /publishOnly/:topic controllers.PublishController.publishOnly(topic) - -#### Health Check -#GET /health_check.html controllers.Assets.at(path="/public", file="health_check.html") -GET /health_check.html controllers.ApplicationController.healthCheck() -PUT /health_check/:isHealthy controllers.ApplicationController.updateHealthCheck(isHealthy: Boolean) - -## Edge -POST /graphs/edges/insert controllers.EdgeController.inserts() -POST /graphs/edges/insertWithWait controllers.EdgeController.insertsWithWait() -POST /graphs/edges/insertBulk controllers.EdgeController.insertsBulk() -POST /graphs/edges/delete controllers.EdgeController.deletes() -POST /graphs/edges/deleteWithWait controllers.EdgeController.deletesWithWait() -POST /graphs/edges/deleteAll controllers.EdgeController.deleteAll() -POST /graphs/edges/update controllers.EdgeController.updates() -POST /graphs/edges/updateWithWait controllers.EdgeController.updatesWithWait() -POST /graphs/edges/increment controllers.EdgeController.increments() -POST /graphs/edges/incrementCount controllers.EdgeController.incrementCounts() -POST /graphs/edges/bulk controllers.EdgeController.mutateBulk() - -## Vertex -POST /graphs/vertices/insert controllers.VertexController.inserts() -POST /graphs/vertices/insertWithWait controllers.VertexController.insertsWithWait() -POST /graphs/vertices/insert/:serviceName/:columnName controllers.VertexController.insertsSimple(serviceName, columnName) -POST /graphs/vertices/delete controllers.VertexController.deletes() -POST /graphs/vertices/deleteWithWait controllers.VertexController.deletesWithWait() -POST /graphs/vertices/delete/:serviceName/:columnName controllers.VertexController.deletesSimple(serviceName, columnName) -POST /graphs/vertices/deleteAll controllers.VertexController.deletesAll() -POST /graphs/vertices/deleteAll/:serviceName/:columnName controllers.VertexController.deletesAllSimple(serviceName, columnName) - - -### SELECT Edges -POST /graphs/getEdges controllers.QueryController.getEdges() -POST /graphs/getEdges/grouped controllers.QueryController.getEdgesWithGrouping() -POST /graphs/getEdgesExcluded controllers.QueryController.getEdgesExcluded() -POST /graphs/getEdgesExcluded/grouped controllers.QueryController.getEdgesExcludedWithGrouping() -POST /graphs/checkEdges controllers.QueryController.checkEdges() - -### this will be deprecated -POST /graphs/getEdgesGrouped controllers.QueryController.getEdgesGrouped() -POST /graphs/getEdgesGroupedExcluded controllers.QueryController.getEdgesGroupedExcluded() -POST /graphs/getEdgesGroupedExcludedFormatted controllers.QueryController.getEdgesGroupedExcludedFormatted() -GET /graphs/getEdge/:srcId/:tgtId/:labelName/:direction controllers.QueryController.getEdge(srcId, tgtId, labelName, direction) - - -### SELECT Vertices -#POST /graphs/getVertex controllers.QueryController.getVertex() -POST /graphs/getVertices controllers.QueryController.getVertices() - - -#### ADMIN -POST /graphs/createService controllers.AdminController.createService() -GET /graphs/getService/:serviceName controllers.AdminController.getService(serviceName) -GET /graphs/getLabels/:serviceName controllers.AdminController.getLabels(serviceName) -POST /graphs/createLabel controllers.AdminController.createLabel() -POST /graphs/addIndex controllers.AdminController.addIndex() -GET /graphs/getLabel/:labelName controllers.AdminController.getLabel(labelName) -PUT /graphs/deleteLabel/:labelName controllers.AdminController.deleteLabel(labelName) - -POST /graphs/addProp/:labelName controllers.AdminController.addProp(labelName) -POST /graphs/createServiceColumn controllers.AdminController.createServiceColumn() -PUT /graphs/deleteServiceColumn/:serviceName/:columnName controllers.AdminController.deleteServiceColumn(serviceName, columnName) -POST /graphs/addServiceColumnProp/:serviceName/:columnName controllers.AdminController.addServiceColumnProp(serviceName, columnName) -POST /graphs/addServiceColumnProps/:serviceName/:columnName controllers.AdminController.addServiceColumnProps(serviceName, columnName) -GET /graphs/getServiceColumn/:serviceName/:columnName controllers.AdminController.getServiceColumn(serviceName, columnName) -POST /graphs/createHTable controllers.AdminController.createHTable() - - - - -#### TEST -#GET /graphs/testGetEdges/:label/:limit/:friendCntStep controllers.QueryController.testGetEdges(label, limit: Int, friendCntStep: Int) -#GET /graphs/testGetEdges2/:label1/:limit1/:label2/:limit2 controllers.QueryController.testGetEdges2(label1, limit1: Int, label2, limit2: Int) -#GET /graphs/testGetEdges3/:label1/:limit1/:label2/:limit2/:label3/:limit3 controllers.QueryController.testGetEdges3(label1, limit1: Int, label2, limit2: Int, label3, limit3: Int) -POST /ping controllers.TestController.ping() -POST /pingAsync controllers.TestController.pingAsync() -GET /graphs/testId controllers.TestController.getRandomId() - -# Map static resources from the /public folder to the /assets URL path -GET /images/*file controllers.Assets.at(path="/public/images", file) -GET /javascripts/*file controllers.Assets.at(path="/public/javascripts", file) -GET /stylesheets/*file controllers.Assets.at(path="/public/stylesheets", file) -GET /font-awesome-4.1.0/*file controllers.Assets.at(path="/public/font-awesome-4.1.0", file) -GET /swagger/*file controllers.Assets.at(path="/public/swagger-ui", file) - - -# AdminController API -#GET /admin/services controllers.AdminController.allServices -GET /admin/labels/:serviceName controllers.AdminController.getLabels(serviceName) -#POST /admin/labels/delete/:zkAddr/:tableName/:labelIds/:minTs/:maxTs controllers.AdminController.deleteEdges(zkAddr, tableName, labelIds, minTs: Long, maxTs: Long) -#POST /admin/labels/deleteAll/:zkAddr/:tableName/:minTs/:maxTs controllers.AdminController.deleteAllEdges(zkAddr, tableName, minTs: Long, maxTs: Long) -#POST /admin/swapLabel/:oldLabelName/:newLabelName controllers.AdminController.swapLabel(oldLabelName, newLabelName) -#GET /admin/reloadLabel/:labelName controllers.AdminController.reloadLabel(labelName) -#POST /admin/getEdges controllers.AdminController.getEdges() -POST /graphs/copyLabel/:oldLabelName/:newLabelName controllers.AdminController.copyLabel(oldLabelName, newLabelName) -POST /graphs/renameLabel/:oldLabelName/:newLabelName controllers.AdminController.renameLabel(oldLabelName, newLabelName) -POST /graphs/updateHTable/:labelName/:newHTableName controllers.AdminController.updateHTable(labelName, newHTableName) -PUT /graphs/loadCache controllers.AdminController.loadCache() - - -# Counter Admin API -POST /counter/v1/:service/:action controllers.CounterController.createAction(service, action) -GET /counter/v1/:service/:action controllers.CounterController.getAction(service, action) -PUT /counter/v1/:service/:action controllers.CounterController.updateAction(service, action) -PUT /counter/v1/:service/:action/prepare controllers.CounterController.prepareAction(service, action) -DELETE /counter/v1/:service/:action controllers.CounterController.deleteAction(service, action) - -# Counter API -GET /counter/v1/:service/:action/ranking controllers.CounterController.getRankingCountAsync(service, action) -DELETE /counter/v1/:service/:action/ranking controllers.CounterController.deleteRankingCount(service, action) -GET /counter/v1/:service/:action/:item controllers.CounterController.getExactCountAsync(service, action, item) -PUT /counter/v1/:service/:action/:item controllers.CounterController.incrementCount(service, action, item) -POST /counter/v1/mget controllers.CounterController.getExactCountAsyncMulti() - -# Experiment API -POST /graphs/experiment/:accessToken/:experimentName/:uuid controllers.ExperimentController.experiment(accessToken, experimentName, uuid) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/conf/test.conf ---------------------------------------------------------------------- diff --git a/conf/test.conf b/conf/test.conf deleted file mode 100644 index c51baef..0000000 --- a/conf/test.conf +++ /dev/null @@ -1,2 +0,0 @@ -max.retry.number=10000 -hbase.fail.prob=0.1 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/project/plugins.sbt ---------------------------------------------------------------------- diff --git a/project/plugins.sbt b/project/plugins.sbt index f09608e..4c90a1f 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,10 +1,10 @@ -// The Typesafe repository -resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/" - // Use the Play sbt plugin for Play projects addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.3.10") // http://www.scalastyle.org/sbt.html addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.7.0") -resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/" +// sbt revolver +addSbtPlugin("io.spray" % "sbt-revolver" % "0.8.0") + +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.3") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/resources/logback.xml ---------------------------------------------------------------------- diff --git a/s2core/src/main/resources/logback.xml b/s2core/src/main/resources/logback.xml index 94f4920..543365c 100644 --- a/s2core/src/main/resources/logback.xml +++ b/s2core/src/main/resources/logback.xml @@ -1,15 +1,20 @@ <?xml version="1.0" encoding="UTF-8"?> <configuration> - <jmxConfigurator/> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern> - %d{ISO8601} %-5level [%thread] %logger{0}: %msg%n - </pattern> - </encoder> - </appender> - - <root level="info"> - <appender-ref ref="STDOUT"/> - </root> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <withJansi>true</withJansi> + <encoder> + <pattern> + %d{ISO8601} [%highlight(%-5level)] [%gray(%logger{0})] [%thread] - %msg%n + </pattern> + </encoder> + </appender> + + <root level="ERROR"> + <appender-ref ref="STDOUT"/> + </root> + + <logger name="application" level="DEBUG"></logger> + + <logger name="error" level="DEBUG"></logger> </configuration> + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala index 727a3be..111f3df 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala @@ -38,7 +38,10 @@ object Graph { "max.retry.number" -> java.lang.Integer.valueOf(100), "max.back.off" -> java.lang.Integer.valueOf(100), "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1), - "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000) + "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000), + "future.cache.max.size" -> java.lang.Integer.valueOf(100000), + "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), + "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000) ) var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs) @@ -315,7 +318,8 @@ class Graph(_config: Config)(implicit ec: ExecutionContext) { // val cache = CacheBuilder.newBuilder().maximumSize(cacheSize).build[java.lang.Integer, Seq[QueryResult]]() val vertexCache = CacheBuilder.newBuilder().maximumSize(cacheSize).build[java.lang.Integer, Option[Vertex]]() - Model(config) + Model.apply(config) + Model.loadCache() // TODO: Make storage client by config param val storage: Storage = new AsynchbaseStorage(config, vertexCache)(ec) @@ -345,5 +349,8 @@ class Graph(_config: Config)(implicit ec: ExecutionContext) { def incrementCounts(edges: Seq[Edge]): Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges) - def shutdown(): Unit = storage.flush() + def shutdown(): Unit = { + storage.flush() + Model.shutdown() + } }
