http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/CounterController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/CounterController.scala b/s2rest_play/app/controllers/CounterController.scala deleted file mode 100644 index c2b4dc2..0000000 --- a/s2rest_play/app/controllers/CounterController.scala +++ /dev/null @@ -1,747 +0,0 @@ -package controllers - -import com.kakao.s2graph.core.ExceptionHandler -import com.kakao.s2graph.core.ExceptionHandler.KafkaMessage -import com.kakao.s2graph.core.mysqls.Label -import config.CounterConfig -import models._ -import org.apache.kafka.clients.producer.ProducerRecord -import play.api.Play -import play.api.libs.json.Reads._ -import play.api.libs.json._ -import play.api.mvc.{Action, Controller, Request} -import s2.config.S2CounterConfig -import s2.counter.core.TimedQualifier.IntervalUnit -import s2.counter.core._ -import s2.counter.core.v1.{ExactStorageAsyncHBase, RankingStorageRedis} -import s2.counter.core.v2.{ExactStorageGraph, RankingStorageGraph} -import s2.models.Counter.ItemType -import s2.models.{Counter, CounterModel} -import s2.util.{CartesianProduct, ReduceMapValue, UnitConverter} - -import scala.concurrent.Future -import scala.util.{Failure, Success, Try} - -/** - * Created by hsleep([email protected]) on 15. 5. 22.. - */ -object CounterController extends Controller { - import play.api.libs.concurrent.Execution.Implicits.defaultContext - - val config = Play.current.configuration.underlying - val s2config = new S2CounterConfig(config) - - private val exactCounterMap = Map( - s2.counter.VERSION_1 -> new ExactCounter(config, new ExactStorageAsyncHBase(config)), - s2.counter.VERSION_2 -> new ExactCounter(config, new ExactStorageGraph(config)) - ) - private val rankingCounterMap = Map( - s2.counter.VERSION_1 -> new RankingCounter(config, new RankingStorageRedis(config)), - s2.counter.VERSION_2 -> new RankingCounter(config, new RankingStorageGraph(config)) - ) - - private val tablePrefixMap = Map ( - s2.counter.VERSION_1 -> "s2counter", - s2.counter.VERSION_2 -> "s2counter_v2" - ) - - private def exactCounter(version: Byte): ExactCounter = exactCounterMap(version) - private def rankingCounter(version: Byte): RankingCounter = rankingCounterMap(version) - - lazy val counterModel = new CounterModel(config) - - def getQueryString[T](key: String, default: String)(implicit request: Request[T]): String = { - request.getQueryString(key).getOrElse(default) - } - - implicit val counterWrites = new Writes[Counter] { - override def writes(o: Counter): JsValue = Json.obj( - "version" -> o.version.toInt, - "autoComb" -> o.autoComb, - "dimension" -> o.dimension, - "useProfile" -> o.useProfile, - "bucketImpId" -> o.bucketImpId, - "useRank" -> o.useRank, - "intervalUnit" -> o.intervalUnit, - "ttl" -> o.ttl, - "dailyTtl" -> o.dailyTtl, - "rateAction" -> o.rateActionId.flatMap { actionId => - counterModel.findById(actionId, useCache = false).map { actionPolicy => - Json.obj("service" -> actionPolicy.service, "action" -> actionPolicy.action) - } - }, - "rateBase" -> o.rateBaseId.flatMap { baseId => - counterModel.findById(baseId, useCache = false).map { basePolicy => - Json.obj("service" -> basePolicy.service, "action" -> basePolicy.action) - } - }, - "rateThreshold" -> o.rateThreshold - ) - } - - def createAction(service: String, action: String) = Action(s2parse.json) { implicit request => - counterModel.findByServiceAction(service, action, useCache = false) match { - case None => - val body = request.body - val version = (body \ "version").asOpt[Int].map(_.toByte).getOrElse(s2.counter.VERSION_2) - val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(true) - val dimension = (body \ "dimension").asOpt[String].getOrElse("") - val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(false) - val bucketImpId = (body \ "bucketImpId").asOpt[String] - - val useExact = (body \ "useExact").asOpt[Boolean].getOrElse(true) - val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(true) - - val intervalUnit = (body \ "intervalUnit").asOpt[String] - // 2 day - val ttl = (body \ "ttl").asOpt[Int].getOrElse(2 * 24 * 60 * 60) - val dailyTtl = (body \ "dailyTtl").asOpt[Int] - val regionMultiplier = (body \ "regionMultiplier").asOpt[Int].getOrElse(1) - - val rateAction = (body \ "rateAction").asOpt[Map[String, String]] - val rateBase = (body \ "rateBase").asOpt[Map[String, String]] - val rateThreshold = (body \ "rateThreshold").asOpt[Int] - - val rateActionId = { - for { - actionMap <- rateAction - service <- actionMap.get("service") - action <- actionMap.get("action") - policy <- counterModel.findByServiceAction(service, action) - } yield { - policy.id - } - } - val rateBaseId = { - for { - actionMap <- rateBase - service <- actionMap.get("service") - action <- actionMap.get("action") - policy <- counterModel.findByServiceAction(service, action) - } yield { - policy.id - } - } - - val hbaseTable = { - Seq(tablePrefixMap(version), service, ttl) ++ dailyTtl mkString "_" - } - - // find label - val itemType = Label.findByName(action, useCache = false) match { - case Some(label) => - ItemType.withName(label.tgtColumnType.toUpperCase) - case None => - val strItemType = (body \ "itemType").asOpt[String].getOrElse("STRING") - ItemType.withName(strItemType.toUpperCase) - } - val policy = Counter(useFlag = true, version, service, action, itemType, autoComb = autoComb, dimension, - useProfile = useProfile, bucketImpId, useRank = useRank, ttl, dailyTtl, Some(hbaseTable), intervalUnit, - rateActionId, rateBaseId, rateThreshold) - - // prepare exact storage - exactCounter(version).prepare(policy) - if (useRank) { - // prepare ranking storage - rankingCounter(version).prepare(policy) - } - counterModel.createServiceAction(policy) - Ok(Json.toJson(Map("msg" -> s"created $service/$action"))) - case Some(policy) => - Ok(Json.toJson(Map("msg" -> s"already exist $service/$action"))) - } - } - - def getAction(service: String, action: String) = Action { request => - counterModel.findByServiceAction(service, action, useCache = false) match { - case Some(policy) => - Ok(Json.toJson(policy)) - case None => - NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))) - } - } - - def updateAction(service: String, action: String) = Action(s2parse.json) { request => - counterModel.findByServiceAction(service, action, useCache = false) match { - case Some(oldPolicy) => - val body = request.body - val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(oldPolicy.autoComb) - val dimension = (body \ "dimension").asOpt[String].getOrElse(oldPolicy.dimension) - val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(oldPolicy.useProfile) - val bucketImpId = (body \ "bucketImpId").asOpt[String] match { - case Some(s) => Some(s) - case None => oldPolicy.bucketImpId - } - - val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(oldPolicy.useRank) - - val intervalUnit = (body \ "intervalUnit").asOpt[String] match { - case Some(s) => Some(s) - case None => oldPolicy.intervalUnit - } - - val rateAction = (body \ "rateAction").asOpt[Map[String, String]] - val rateBase = (body \ "rateBase").asOpt[Map[String, String]] - val rateThreshold = (body \ "rateThreshold").asOpt[Int] match { - case Some(i) => Some(i) - case None => oldPolicy.rateThreshold - } - - val rateActionId = { - for { - actionMap <- rateAction - service <- actionMap.get("service") - action <- actionMap.get("action") - policy <- counterModel.findByServiceAction(service, action, useCache = false) - } yield { - policy.id - } - } match { - case Some(i) => Some(i) - case None => oldPolicy.rateActionId - } - val rateBaseId = { - for { - actionMap <- rateBase - service <- actionMap.get("service") - action <- actionMap.get("action") - policy <- counterModel.findByServiceAction(service, action, useCache = false) - } yield { - policy.id - } - } match { - case Some(i) => Some(i) - case None => oldPolicy.rateBaseId - } - - // new counter - val policy = Counter(id = oldPolicy.id, useFlag = oldPolicy.useFlag, oldPolicy.version, service, action, oldPolicy.itemType, autoComb = autoComb, dimension, - useProfile = useProfile, bucketImpId, useRank = useRank, oldPolicy.ttl, oldPolicy.dailyTtl, oldPolicy.hbaseTable, intervalUnit, - rateActionId, rateBaseId, rateThreshold) - - counterModel.updateServiceAction(policy) - Ok(Json.toJson(Map("msg" -> s"updated $service/$action"))) - case None => - NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))) - } - } - - def prepareAction(service: String, action: String) = Action(s2parse.json) { request => - // for data migration - counterModel.findByServiceAction(service, action, useCache = false) match { - case Some(policy) => - val body = request.body - val version = (body \ "version").as[Int].toByte - if (version != policy.version) { - // change table name - val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) ++ policy.dailyTtl mkString "_" - val newPolicy = policy.copy(version = version, hbaseTable = Some(newTableName)) - exactCounter(version).prepare(newPolicy) - if (newPolicy.useRank) { - rankingCounter(version).prepare(newPolicy) - } - Ok(Json.toJson(Map("msg" -> s"prepare storage v$version $service/$action"))) - } else { - Ok(Json.toJson(Map("msg" -> s"already prepared storage v$version $service/$action"))) - } - case None => - NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))) - } - } - - def deleteAction(service: String, action: String) = Action.apply { - { - for { - policy <- counterModel.findByServiceAction(service, action, useCache = false) - } yield { - Try { - exactCounter(policy.version).destroy(policy) - if (policy.useRank) { - rankingCounter(policy.version).destroy(policy) - } - counterModel.deleteServiceAction(policy) - } match { - case Success(v) => - Ok(Json.toJson(Map("msg" -> s"deleted $service/$action"))) - case Failure(ex) => - throw ex - } - } - }.getOrElse(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))) - } - - def getExactCountAsync(service: String, action: String, item: String) = Action.async { implicit request => - val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq - .map(IntervalUnit.withName) - val limit = getQueryString("limit", "1").toInt - - val qsSum = request.getQueryString("sum") - - val optFrom = request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis) - val optTo = request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis) - - val limitOpt = (optFrom, optTo) match { - case (Some(_), Some(_)) => - None - case _ => - Some(limit) - } - - // find dimension - lazy val dimQueryValues = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) => - (k.substring(1), v.mkString(",").split(',').filter(_.nonEmpty).toSet) - } -// Logger.warn(s"$dimQueryValues") - - counterModel.findByServiceAction(service, action) match { - case Some(policy) => - val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optFrom, optTo) - val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo) - try { -// Logger.warn(s"$tqs $qsSum") - if (tqs.head.length > 1 && qsSum.isDefined) { - getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum).map { jsVal => - Ok(jsVal) - } - } else { - getExactCountToJs(policy, item, timeRange, limitOpt, dimQueryValues).map { jsVal => - Ok(jsVal) - } - } - } catch { - case e: Exception => - throw e -// Future.successful(BadRequest(s"$service, $action, $item")) - } - case None => - Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))) - } - } - - /** - * [{ - * "service": , "action", "itemIds": [], "interval": string, "limit": int, "from": ts, "to": ts, - * "dimensions": [{"key": list[String]}] - * }] - * @return - */ - private def parseExactCountParam(jsValue: JsValue) = { - val service = (jsValue \ "service").as[String] - val action = (jsValue \ "action").as[String] - val itemIds = (jsValue \ "itemIds").as[Seq[String]] - val intervals = (jsValue \ "intervals").asOpt[Seq[String]].getOrElse(Seq("t")).distinct.map(IntervalUnit.withName) - val limit = (jsValue \ "limit").asOpt[Int].getOrElse(1) - val from = (jsValue \ "from").asOpt[Long] - val to = (jsValue \ "to").asOpt[Long] - val sum = (jsValue \ "sum").asOpt[String] - val dimensions = { - for { - dimension <- (jsValue \ "dimensions").asOpt[Seq[JsObject]].getOrElse(Nil) - (k, vs) <- dimension.fields - } yield { - k -> vs.as[Seq[String]].toSet - } - }.toMap - (service, action, itemIds, intervals, limit, from, to, dimensions, sum) - } - - def getExactCountAsyncMulti = Action.async(s2parse.json) { implicit request => - val jsValue = request.body - try { - val futures = { - for { - jsObject <- jsValue.asOpt[List[JsObject]].getOrElse(Nil) - (service, action, itemIds, intervalUnits, limit, from, to, dimQueryValues, qsSum) = parseExactCountParam(jsObject) - optFrom = from.map(UnitConverter.toMillis) - optTo = to.map(UnitConverter.toMillis) - timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo) - policy <- counterModel.findByServiceAction(service, action).toSeq - item <- itemIds - } yield { - val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optFrom, optTo) - val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo) - val limitOpt = (optFrom, optTo) match { - case (Some(_), Some(_)) => - None - case _ => - Some(limit) - } - -// Logger.warn(s"$item, $limit, $optFrom, $optTo, $qsSum, $tqs") - - if (tqs.head.length > 1 && qsSum.isDefined) { - getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum) - } else { - getExactCountToJs(policy, item, timeRange, limitOpt, dimQueryValues) - } - } - } - Future.sequence(futures).map { rets => - Ok(Json.toJson(rets)) - } - } catch { - case e: Exception => - throw e -// Future.successful(BadRequest(s"$jsValue")) - } - } - - private [controllers] def fetchedToResult(fetchedCounts: FetchedCountsGrouped, limitOpt: Option[Int]): Seq[ExactCounterIntervalItem] = { - for { - ((interval, dimKeyValues), values) <- fetchedCounts.intervalWithCountMap - } yield { - val counterItems = { - val sortedItems = values.toSeq.sortBy { case (eq, v) => -eq.tq.ts } - val limited = limitOpt match { - case Some(limit) => sortedItems.take(limit) - case None => sortedItems - } - for { - (eq, value) <- limited - } yield { - ExactCounterItem(eq.tq.ts, value, value.toDouble) - } - } - ExactCounterIntervalItem(interval.toString, dimKeyValues, counterItems) - } - }.toSeq - - private def decayedToResult(decayedCounts: DecayedCounts): Seq[ExactCounterIntervalItem] = { - for { - (eq, score) <- decayedCounts.qualifierWithCountMap - } yield { - ExactCounterIntervalItem(eq.tq.q.toString, eq.dimKeyValues, Seq(ExactCounterItem(eq.tq.ts, score.toLong, score))) - } - }.toSeq - - private def getExactCountToJs(policy: Counter, - item: String, - timeRange: Seq[(TimedQualifier, TimedQualifier)], - limitOpt: Option[Int], - dimQueryValues: Map[String, Set[String]]): Future[JsValue] = { - exactCounter(policy.version).getCountsAsync(policy, Seq(item), timeRange, dimQueryValues).map { seq => - val items = { - for { - fetched <- seq - } yield { - fetchedToResult(fetched, limitOpt) - } - }.flatten - Json.toJson(ExactCounterResult(ExactCounterResultMeta(policy.service, policy.action, item), items)) - } - } - - private def getDecayedCountToJs(policy: Counter, - item: String, - timeRange: Seq[(TimedQualifier, TimedQualifier)], - dimQueryValues: Map[String, Set[String]], - qsSum: Option[String]): Future[JsValue] = { - exactCounter(policy.version).getDecayedCountsAsync(policy, Seq(item), timeRange, dimQueryValues, qsSum).map { seq => - val decayedCounts = seq.head - val meta = ExactCounterResultMeta(policy.service, policy.action, decayedCounts.exactKey.itemKey) - val intervalItems = decayedToResult(decayedCounts) - Json.toJson(ExactCounterResult(meta, intervalItems)) - } - } - - def getRankingCountAsync(service: String, action: String) = Action.async { implicit request => - lazy val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq - .map(IntervalUnit.withName) - lazy val limit = getQueryString("limit", "1").toInt - lazy val kValue = getQueryString("k", "10").toInt - - lazy val qsSum = request.getQueryString("sum") - - lazy val optFrom = request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis) - lazy val optTo = request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis) - - // find dimension - lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) => - (k.substring(1), v.mkString(",").split(',').toList) - } - - val dimensions = { - for { - values <- CartesianProduct(dimensionMap.values.toList).toSeq - } yield { - dimensionMap.keys.zip(values).toMap - } - } - - counterModel.findByServiceAction(service, action) match { - case Some(policy) => - val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optTo) - val dimKeys = { - for { - dimension <- dimensions - } yield { - dimension -> tqs.map(tq => RankingKey(policy.id, policy.version, ExactQualifier(tq, dimension))) - } - } - - // if tqs has only 1 tq, do not apply sum function - try { - val rankResult = { - if (tqs.length > 1 && qsSum.isDefined) { - getSumRankCounterResultAsync(policy, dimKeys, kValue, qsSum) - } else { - // no summary - Future.successful(getRankCounterResult(policy, dimKeys, kValue)) - } - } - - rankResult.map { result => - Ok(Json.toJson(result)) - } - } catch { - case e: UnsupportedOperationException => - Future.successful(NotImplemented(Json.toJson( - Map("msg" -> e.getMessage) - ))) - case e: Throwable => - throw e - } - case None => - Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))) - } - } - - def deleteRankingCount(service: String, action: String) = Action.async { implicit request => - lazy val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq - .map(IntervalUnit.withName) - lazy val limit = getQueryString("limit", "1").toInt - - // find dimension - lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) => - (k.substring(1), v.mkString(",").split(',').toList) - } - - val dimensions = { - for { - values <- CartesianProduct(dimensionMap.values.toList).toSeq - } yield { - dimensionMap.keys.zip(values).toMap - } - } - - Future { - counterModel.findByServiceAction(service, action) match { - case Some(policy) => - val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit) - val keys = { - for { - dimension <- dimensions - tq <- tqs - } yield { - RankingKey(policy.id, policy.version, ExactQualifier(tq, dimension)) - } - } - - for { - key <- keys - } { - rankingCounter(policy.version).delete(key) - } - - Ok(JsObject( - Seq( - ("msg", Json.toJson(s"delete ranking in $service.$action")), - ("items", Json.toJson({ - for { - key <- keys - } yield { - s"${key.eq.tq.q}.${key.eq.tq.ts}.${key.eq.dimension}" - } - })) - ) - )) - case None => - NotFound(Json.toJson( - Map("msg" -> s"$service.$action not found") - )) - } - } - } - - val reduceRateRankingValue = new ReduceMapValue[ExactKeyTrait, RateRankingValue](RateRankingValue.reduce, RateRankingValue(-1, -1)) - - // change format - private def getDecayedCountsAsync(policy: Counter, - items: Seq[String], - timeRange: (TimedQualifier, TimedQualifier), - dimension: Map[String, String], - qsSum: Option[String]): Future[Seq[(ExactKeyTrait, Double)]] = { - exactCounter(policy.version).getDecayedCountsAsync(policy, items, Seq(timeRange), dimension.mapValues(s => Set(s)), qsSum).map { seq => - for { - DecayedCounts(exactKey, qcMap) <- seq - value <- qcMap.values - } yield { - exactKey -> value - } - } - } - - def getSumRankCounterResultAsync(policy: Counter, - dimKeys: Seq[(Map[String, String], Seq[RankingKey])], - kValue: Int, - qsSum: Option[String]): Future[RankCounterResult] = { - val futures = { - for { - (dimension, keys) <- dimKeys - } yield { - val tqs = keys.map(rk => rk.eq.tq) - val (tqFrom, tqTo) = (tqs.last, tqs.head) - val items = rankingCounter(policy.version).getAllItems(keys, kValue) -// Logger.warn(s"item count: ${items.length}") - val future = { - if (policy.isRateCounter) { - val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get - val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get - - val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq => - seq.map { case (k, score) => - ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1) - }.toMap - } - val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq => - seq.map { case (k, score) => - ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score) - }.toMap - } - futureAction.zip(futureBase).map { case (actionScores, baseScores) => - reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) => -// Logger.warn(s"$k -> $rrv") - k -> rrv.rankingValue.score - }.toSeq - } - } - else if (policy.isTrendCounter) { - val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get - val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get - - val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq => - seq.map { case (k, score) => - ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1) - }.toMap - } - val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom.add(-1), tqTo.add(-1)), dimension, qsSum).map { seq => - seq.map { case (k, score) => - ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score) - }.toMap - } - futureAction.zip(futureBase).map { case (actionScores, baseScores) => - reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) => -// Logger.warn(s"$k -> $rrv") - k -> rrv.rankingValue.score - }.toSeq - } - } - else { - getDecayedCountsAsync(policy, items, (tqFrom, tqTo), dimension, qsSum) - } - } - future.map { keyWithScore => - val ranking = keyWithScore.sortBy(-_._2).take(kValue) - val rankCounterItems = { - for { - idx <- ranking.indices - (exactKey, score) = ranking(idx) - } yield { - val realId = policy.itemType match { - case ItemType.BLOB => exactCounter(policy.version).getBlobValue(policy, exactKey.itemKey) - .getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} ${exactKey.itemKey}")) - case _ => exactKey.itemKey - } - RankCounterItem(idx + 1, realId, score) - } - } - - val eq = ExactQualifier(tqFrom, dimension) - RankCounterDimensionItem(eq.tq.q.toString, eq.tq.ts, eq.dimension, -1, rankCounterItems) - } - } - } - - Future.sequence(futures).map { dimensionResultList => - RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList) - } - } - - def getRankCounterResult(policy: Counter, dimKeys: Seq[(Map[String, String], Seq[RankingKey])], kValue: Int): RankCounterResult = { - val dimensionResultList = { - for { - (dimension, keys) <- dimKeys - key <- keys - } yield { - val rankingValue = rankingCounter(policy.version).getTopK(key, kValue) - val ranks = { - for { - rValue <- rankingValue.toSeq - idx <- rValue.values.indices - rank = idx + 1 - } yield { - val (id, score) = rValue.values(idx) - val realId = policy.itemType match { - case ItemType.BLOB => - exactCounter(policy.version) - .getBlobValue(policy, id) - .getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} $id")) - case _ => id - } - RankCounterItem(rank, realId, score) - } - } - val eq = key.eq - val tq = eq.tq - RankCounterDimensionItem(tq.q.toString, tq.ts, eq.dimension, rankingValue.map(v => v.totalScore).getOrElse(0d), ranks) - } - } - - RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList) - } - - type Record = ProducerRecord[String, String] - - def incrementCount(service: String, action: String, item: String) = Action.async(s2parse.json) { request => - Future { - /** - * { - * timestamp: Long - * property: {} - * value: Int - * } - */ - lazy val metaMap = Map("service" -> service, "action" -> action, "item" -> item) - counterModel.findByServiceAction(service, action).map { policy => - val body = request.body - try { - val ts = (body \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()).toString - val dimension = (body \ "dimension").asOpt[JsValue].getOrElse(Json.obj()) - val property = (body \ "property").asOpt[JsValue].getOrElse(Json.obj()) - - val msg = List(ts, service, action, item, dimension, property).mkString("\t") - - // produce to kafka - // hash partitioner by key - ExceptionHandler.enqueue(KafkaMessage(new Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg))) - - Ok(Json.toJson( - Map( - "meta" -> metaMap - ) - )) - } - catch { - case e: JsResultException => - BadRequest(Json.toJson( - Map("msg" -> s"need timestamp.") - )) - } - }.getOrElse { - NotFound(Json.toJson( - Map("msg" -> s"$service.$action not found") - )) - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/EdgeController.scala b/s2rest_play/app/controllers/EdgeController.scala deleted file mode 100644 index a8e1a41..0000000 --- a/s2rest_play/app/controllers/EdgeController.scala +++ /dev/null @@ -1,219 +0,0 @@ -package controllers - -import actors.QueueActor -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.{Label} -import com.kakao.s2graph.core.rest.RequestParser -import com.kakao.s2graph.core.utils.logger -import config.Config -import org.apache.kafka.clients.producer.ProducerRecord -import play.api.libs.json._ -import play.api.mvc.{Controller, Result} - -import scala.collection.Seq -import scala.concurrent.Future - -object EdgeController extends Controller { - - import ExceptionHandler._ - import controllers.ApplicationController._ - import play.api.libs.concurrent.Execution.Implicits._ - - private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph - private val requestParser: RequestParser = com.kakao.s2graph.rest.Global.s2parser - private def jsToStr(js: JsValue): String = js match { - case JsString(s) => s - case _ => js.toString() - } - - def toTsv(jsValue: JsValue, op: String): String = { - val ts = jsToStr(jsValue \ "timestamp") - val from = jsToStr(jsValue \ "from") - val to = jsToStr(jsValue \ "to") - val label = jsToStr(jsValue \ "label") - val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()) - - (jsValue \ "direction").asOpt[String] match { - case None => Seq(ts, op, "e", from, to, label, props).mkString("\t") - case Some(dir) => Seq(ts, op, "e", from, to, label, props, dir).mkString("\t") - } - } - - def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = { - if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) - - else { - try { - logger.debug(s"$jsValue") - val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation) - - for ((edge, orgJs) <- edges.zip(jsOrgs)) { - if (edge.isAsync) - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, Option(toTsv(orgJs, operation)))) - else - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, edge, Option(toTsv(orgJs, operation)))) - } - - val edgesToStore = edges.filterNot(e => e.isAsync) - - if (withWait) { - val rets = s2.mutateEdges(edgesToStore, withWait = true) - rets.map(Json.toJson(_)).map(jsonResponse(_)) - } else { - val rets = edgesToStore.map { edge => QueueActor.router ! edge; true } - Future.successful(jsonResponse(Json.toJson(rets))) - } - } catch { - case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e")) - case e: Exception => - logger.error(s"mutateAndPublish: $e", e) - Future.successful(InternalServerError(s"${e.getStackTrace}")) - } - } - } - - def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] = { - if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) - - logger.debug(s"$str") - val edgeStrs = str.split("\\n") - - var vertexCnt = 0L - var edgeCnt = 0L - try { - val elements = - for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); element <- Graph.toGraphElement(str)) yield { - element match { - case v: Vertex => vertexCnt += 1 - case e: Edge => edgeCnt += 1 - } - if (element.isAsync) { - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, element, Some(str))) - } else { - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, element, Some(str))) - } - element - } - - //FIXME: - val elementsToStore = elements.filterNot(e => e.isAsync) - if (withWait) { - val rets = s2.mutateElements(elementsToStore, withWait) - rets.map(Json.toJson(_)).map(jsonResponse(_)) - } else { - val rets = elementsToStore.map { element => QueueActor.router ! element; true } - Future.successful(jsonResponse(Json.toJson(rets))) - } - } catch { - case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e")) - case e: Throwable => - logger.error(s"mutateAndPublish: $e", e) - Future.successful(InternalServerError(s"${e.getStackTrace}")) - } - } - - def mutateBulk() = withHeaderAsync(parse.text) { request => - mutateAndPublish(request.body, withWait = false) - } - - def mutateBulkWithWait() = withHeaderAsync(parse.text) { request => - mutateAndPublish(request.body, withWait = true) - } - - def inserts() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insert") - } - - def insertsWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insert", withWait = true) - } - - def insertsBulk() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insertBulk") - } - - def deletes() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "delete") - } - - def deletesWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "delete", withWait = true) - } - - def updates() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "update") - } - - def updatesWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "update", withWait = true) - } - - def increments() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "increment") - } - - def incrementsWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "increment", withWait = true) - } - - def incrementCounts() = withHeaderAsync(jsonParser) { request => - val jsValue = request.body - val edges = requestParser.toEdges(jsValue, "incrementCount") - - s2.incrementCounts(edges, withWait = true).map { results => - val json = results.map { case (isSuccess, resultCount) => - Json.obj("success" -> isSuccess, "result" -> resultCount) - } - - jsonResponse(Json.toJson(json)) - } - } - - def deleteAll() = withHeaderAsync(jsonParser) { request => -// deleteAllInner(request.body, withWait = false) - deleteAllInner(request.body, withWait = true) - } - - def deleteAllInner(jsValue: JsValue, withWait: Boolean) = { - - /** logging for delete all request */ - def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = { - val kafkaMessages = for { - id <- ids - label <- labels - } yield { - val tsv = Seq(ts, "deleteAll", "e", jsToStr(id), jsToStr(id), label.label, "{}", direction).mkString("\t") - val topic = topicOpt.getOrElse { - if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC - } - - val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) - kafkaMsg - } - - ExceptionHandler.enqueues(kafkaMessages) - } - - def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], ts: Long, vertices: Seq[Vertex]) = { - enqueueLogMessage(ids, labels, ts, direction, None) - val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) - if (withWait) { - future - } else { - Future.successful(true) - } - } - - val deleteFutures = jsValue.as[Seq[JsValue]].map { json => - val (labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json) - if (labels.isEmpty || ids.isEmpty) Future.successful(true) - else deleteEach(labels, direction, ids, ts, vertices) - } - - val deleteResults = Future.sequence(deleteFutures) - deleteResults.map { rst => - logger.debug(s"deleteAllInner: $rst") - Ok(s"deleted... ${rst.toString()}") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/ExperimentController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/ExperimentController.scala b/s2rest_play/app/controllers/ExperimentController.scala deleted file mode 100644 index e48b0f1..0000000 --- a/s2rest_play/app/controllers/ExperimentController.scala +++ /dev/null @@ -1,24 +0,0 @@ -package controllers - - -import com.kakao.s2graph.core.mysqls.Experiment -import com.kakao.s2graph.core.rest.RestHandler -import com.kakao.s2graph.core.utils.logger -import play.api.mvc._ -import scala.concurrent.ExecutionContext.Implicits.global - -object ExperimentController extends Controller { - private val rest: RestHandler = com.kakao.s2graph.rest.Global.s2rest - - import ApplicationController._ - - def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(jsonText) { request => - val body = request.body - - val res = rest.doPost(request.uri, body, request.headers.get(Experiment.impressionKey)) - res.body.map { case js => - val headers = res.headers :+ ("result_size" -> rest.calcSize(js).toString) - jsonResponse(js, headers: _*) - } recoverWith ApplicationController.requestFallback(body) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/JsonBodyParser.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/JsonBodyParser.scala b/s2rest_play/app/controllers/JsonBodyParser.scala deleted file mode 100644 index 4339eb4..0000000 --- a/s2rest_play/app/controllers/JsonBodyParser.scala +++ /dev/null @@ -1,86 +0,0 @@ -package controllers - -import com.kakao.s2graph.core.utils.logger -import play.api.Play -import play.api.libs.iteratee.{Execution, Iteratee} -import play.api.libs.json.{JsValue, Json} -import play.api.mvc._ - -import scala.concurrent.Future -import scala.util.control.NonFatal - -object s2parse extends BodyParsers { - - import parse._ - - val defaultMaxTextLength = 1024 * 512 - val defaultMaxJsonLength = 1024 * 512 - - def json: BodyParser[JsValue] = json(defaultMaxTextLength) - - /** - * parseText with application/json header for Pre-Process text - */ - def jsonText: BodyParser[String] = when( - _.contentType.exists(m => m.equalsIgnoreCase("text/json") || m.equalsIgnoreCase("application/json")), - jsonText(defaultMaxTextLength), - createBadResult("Expecting text/json or application/json body") - ) - - private def jsonText(maxLength: Int): BodyParser[String] = BodyParser("json, maxLength=" + maxLength) { request => - import play.api.libs.iteratee.Execution.Implicits.trampoline - import play.api.libs.iteratee.Traversable - - Traversable.takeUpTo[Array[Byte]](maxLength) - .transform(Iteratee.consume[Array[Byte]]().map(c => new String(c, "UTF-8"))) - .flatMap(Iteratee.eofOrElse(Results.EntityTooLarge)) - } - - def json(maxLength: Int): BodyParser[JsValue] = when( - _.contentType.exists(m => m.equalsIgnoreCase("text/json") || m.equalsIgnoreCase("application/json")), - tolerantJson(maxLength), - createBadResult("Expecting text/json or application/json body") - ) - - def tolerantJson(maxLength: Int): BodyParser[JsValue] = - tolerantBodyParser[JsValue]("json", maxLength, "Invalid Json") { (request, bytes) => - // Encoding notes: RFC 4627 requires that JSON be encoded in Unicode, and states that whether that's - // UTF-8, UTF-16 or UTF-32 can be auto detected by reading the first two bytes. So we ignore the declared - // charset and don't decode, we passing the byte array as is because Jackson supports auto detection. - Json.parse(bytes) - } - - private def tolerantBodyParser[A](name: String, maxLength: Int, errorMessage: String)(parser: (RequestHeader, Array[Byte]) => A): BodyParser[A] = - BodyParser(name + ", maxLength=" + maxLength) { request => - import play.api.libs.iteratee.Execution.Implicits.trampoline - import play.api.libs.iteratee.Traversable - - import scala.util.control.Exception._ - - val bodyParser: Iteratee[Array[Byte], Either[Result, Either[Future[Result], A]]] = - Traversable.takeUpTo[Array[Byte]](maxLength).transform( - Iteratee.consume[Array[Byte]]().map { bytes => - allCatch[A].either { - parser(request, bytes) - }.left.map { - case NonFatal(e) => - val txt = new String(bytes) - logger.error(s"$errorMessage: $txt", e) - createBadResult(s"$errorMessage: $e")(request) - case t => throw t - } - } - ).flatMap(Iteratee.eofOrElse(Results.EntityTooLarge)) - - bodyParser.mapM { - case Left(tooLarge) => Future.successful(Left(tooLarge)) - case Right(Left(badResult)) => badResult.map(Left.apply) - case Right(Right(body)) => Future.successful(Right(body)) - } - } - - private def createBadResult(msg: String): RequestHeader => Future[Result] = { request => - Play.maybeApplication.map(_.global.onBadRequest(request, msg)) - .getOrElse(Future.successful(Results.BadRequest)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/PublishController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/PublishController.scala b/s2rest_play/app/controllers/PublishController.scala deleted file mode 100644 index b1495d5..0000000 --- a/s2rest_play/app/controllers/PublishController.scala +++ /dev/null @@ -1,54 +0,0 @@ -package controllers - -import com.kakao.s2graph.core.ExceptionHandler -import config.Config -import org.apache.kafka.clients.producer.ProducerRecord -import play.api.mvc._ - -import scala.concurrent.Future - -object PublishController extends Controller { - - import ApplicationController._ - import ExceptionHandler._ - import play.api.libs.concurrent.Execution.Implicits._ - - /** - * never check validation on string. just redirect strings to kafka. - */ - val serviceNotExistException = new RuntimeException(s"service is not created in s2graph. create service first.") - - // private def toService(topic: String): String = { - // Service.findByName(topic).map(service => s"${service.serviceName}-${Config.PHASE}").getOrElse(throw serviceNotExistException) - // } - def publishOnly(topic: String) = withHeaderAsync(parse.text) { request => - if (!Config.IS_WRITE_SERVER) Future.successful(UNAUTHORIZED) - // val kafkaTopic = toService(topic) - val strs = request.body.split("\n") - strs.foreach(str => { - val keyedMessage = new ProducerRecord[Key, Val](Config.KAFKA_LOG_TOPIC, str) - // val keyedMessage = new ProducerRecord[Key, Val](kafkaTopic, s"$str") - // logger.debug(s"$kafkaTopic, $str") - ExceptionHandler.enqueue(KafkaMessage(keyedMessage)) - }) - Future.successful( - Ok("publish success.\n").withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10") - ) - // try { - // - // } catch { - // case e: Exception => Future.successful(BadRequest(e.getMessage)) - // } - } - - def publish(topic: String) = publishOnly(topic) - - // def mutateBulk(topic: String) = Action.async(parse.text) { request => - // EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, Config.KAFKA_FAIL_TOPIC, request.body).map { result => - // result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10") - // } - // } - def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request => - EdgeController.mutateAndPublish(request.body) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/QueryController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/QueryController.scala b/s2rest_play/app/controllers/QueryController.scala deleted file mode 100644 index d133370..0000000 --- a/s2rest_play/app/controllers/QueryController.scala +++ /dev/null @@ -1,52 +0,0 @@ -package controllers - -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.Experiment -import com.kakao.s2graph.core.rest.RestHandler -import play.api.libs.json.{Json} -import play.api.mvc._ - -import scala.language.postfixOps - -object QueryController extends Controller with JSONParser { - - import ApplicationController._ - import play.api.libs.concurrent.Execution.Implicits.defaultContext - - private val rest: RestHandler = com.kakao.s2graph.rest.Global.s2rest - - def delegate(request: Request[String]) = { - rest.doPost(request.uri, request.body, request.headers.get(Experiment.impressionKey)).body.map { - js => - jsonResponse(js, "result_size" -> rest.calcSize(js).toString) - } recoverWith ApplicationController.requestFallback(request.body) - } - - def getEdges() = withHeaderAsync(jsonText)(delegate) - - def getEdgesWithGrouping() = withHeaderAsync(jsonText)(delegate) - - def getEdgesExcluded() = withHeaderAsync(jsonText)(delegate) - - def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonText)(delegate) - - def checkEdges() = withHeaderAsync(jsonText)(delegate) - - def getEdgesGrouped() = withHeaderAsync(jsonText)(delegate) - - def getEdgesGroupedExcluded() = withHeaderAsync(jsonText)(delegate) - - def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonText)(delegate) - - def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) = - withHeaderAsync(jsonText) { - request => - val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) - rest.checkEdges(params).body.map { - js => - jsonResponse(js, "result_size" -> rest.calcSize(js).toString) - } recoverWith ApplicationController.requestFallback(request.body) - } - - def getVertices() = withHeaderAsync(jsonText)(delegate) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/TestController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/TestController.scala b/s2rest_play/app/controllers/TestController.scala deleted file mode 100644 index 8558ae5..0000000 --- a/s2rest_play/app/controllers/TestController.scala +++ /dev/null @@ -1,24 +0,0 @@ -package controllers - -import play.api.mvc.{Action, Controller} -import util.TestDataLoader - -import scala.concurrent.Future - - -object TestController extends Controller { - import ApplicationController._ - - def getRandomId() = withHeader(parse.anyContent) { request => - val id = TestDataLoader.randomId - Ok(s"${id}") - } - - def pingAsync() = Action.async(parse.json) { request => - Future.successful(Ok("Pong\n")) - } - - def ping() = Action(parse.json) { request => - Ok("Pong\n") - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/VertexController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/VertexController.scala b/s2rest_play/app/controllers/VertexController.scala deleted file mode 100644 index 977c2fc..0000000 --- a/s2rest_play/app/controllers/VertexController.scala +++ /dev/null @@ -1,86 +0,0 @@ -package controllers - - -import actors.QueueActor -import com.kakao.s2graph.core.rest.RequestParser -import com.kakao.s2graph.core.utils.logger -import com.kakao.s2graph.core.{ExceptionHandler, Graph, GraphExceptions} -import config.Config -import play.api.libs.json.{JsValue, Json} -import play.api.mvc.{Controller, Result} - -import scala.concurrent.Future - -object VertexController extends Controller { - private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph - private val requestParser: RequestParser = com.kakao.s2graph.rest.Global.s2parser - - import ExceptionHandler._ - import controllers.ApplicationController._ - import play.api.libs.concurrent.Execution.Implicits._ - - def tryMutates(jsValue: JsValue, operation: String, serviceNameOpt: Option[String] = None, columnNameOpt: Option[String] = None, withWait: Boolean = false): Future[Result] = { - if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) - else { - try { - val vertices = requestParser.toVertices(jsValue, operation, serviceNameOpt, columnNameOpt) - - for (vertex <- vertices) { - if (vertex.isAsync) - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, vertex, None)) - else - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, vertex, None)) - } - - //FIXME: - val verticesToStore = vertices.filterNot(v => v.isAsync) - - if (withWait) { - val rets = s2.mutateVertices(verticesToStore, withWait = true) - rets.map(Json.toJson(_)).map(jsonResponse(_)) - } else { - val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true } - Future.successful(jsonResponse(Json.toJson(rets))) - } - } catch { - case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"e")) - case e: Exception => - logger.error(s"[Failed] tryMutates", e) - Future.successful(InternalServerError(s"${e.getStackTrace}")) - } - } - } - - def inserts() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insert") - } - - def insertsWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insert", withWait = true) - } - - def insertsSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "insert", Some(serviceName), Some(columnName)) - } - - def deletes() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "delete") - } - - def deletesWithWait() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "delete", withWait = true) - } - - def deletesSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "delete", Some(serviceName), Some(columnName)) - } - - def deletesAll() = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "deleteAll") - } - - def deletesAllSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => - tryMutates(request.body, "deleteAll", Some(serviceName), Some(columnName)) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/models/ExactCounterItem.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/models/ExactCounterItem.scala b/s2rest_play/app/models/ExactCounterItem.scala deleted file mode 100644 index 244c046..0000000 --- a/s2rest_play/app/models/ExactCounterItem.scala +++ /dev/null @@ -1,38 +0,0 @@ -package models - -import play.api.libs.json.{Json, Writes} - -/** - * Created by alec on 15. 4. 15.. - */ -case class ExactCounterItem(ts: Long, count: Long, score: Double) - -case class ExactCounterIntervalItem(interval: String, dimension: Map[String, String], counter: Seq[ExactCounterItem]) - -case class ExactCounterResultMeta(service: String, action: String, item: String) - -case class ExactCounterResult(meta: ExactCounterResultMeta, data: Seq[ExactCounterIntervalItem]) - -object ExactCounterItem { - implicit val writes = new Writes[ExactCounterItem] { - def writes(item: ExactCounterItem) = Json.obj( - "ts" -> item.ts, - "time" -> tsFormat.format(item.ts), - "count" -> item.count, - "score" -> item.score - ) - } - implicit val reads = Json.reads[ExactCounterItem] -} - -object ExactCounterIntervalItem { - implicit val format = Json.format[ExactCounterIntervalItem] -} - -object ExactCounterResultMeta { - implicit val format = Json.format[ExactCounterResultMeta] -} - -object ExactCounterResult { - implicit val formats = Json.format[ExactCounterResult] -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/models/RankCounterItem.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/models/RankCounterItem.scala b/s2rest_play/app/models/RankCounterItem.scala deleted file mode 100644 index aaa7df7..0000000 --- a/s2rest_play/app/models/RankCounterItem.scala +++ /dev/null @@ -1,40 +0,0 @@ -package models - -import play.api.libs.json.{Json, Writes} - -/** - * Created by alec on 15. 4. 15.. - */ -case class RankCounterItem(rank: Int, id: String, score: Double) - -case class RankCounterDimensionItem(interval: String, ts: Long, dimension: String, total: Double, ranks: Seq[RankCounterItem]) - -case class RankCounterResultMeta(service: String, action: String) - -case class RankCounterResult(meta: RankCounterResultMeta, data: Seq[RankCounterDimensionItem]) - -object RankCounterItem { - implicit val format = Json.format[RankCounterItem] -} - -object RankCounterDimensionItem { - implicit val writes = new Writes[RankCounterDimensionItem] { - def writes(item: RankCounterDimensionItem) = Json.obj( - "interval" -> item.interval, - "ts" -> item.ts, - "time" -> tsFormat.format(item.ts), - "dimension" -> item.dimension, - "total" -> item.total, - "ranks" -> item.ranks - ) - } - implicit val reads = Json.reads[RankCounterDimensionItem] -} - -object RankCounterResultMeta { - implicit val format = Json.format[RankCounterResultMeta] -} - -object RankCounterResult { - implicit val format = Json.format[RankCounterResult] -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/models/package.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/models/package.scala b/s2rest_play/app/models/package.scala deleted file mode 100644 index 17fa8e1..0000000 --- a/s2rest_play/app/models/package.scala +++ /dev/null @@ -1,8 +0,0 @@ -import java.text.SimpleDateFormat - -/** - * Created by alec on 15. 4. 20.. - */ -package object models { - def tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala new file mode 100644 index 0000000..8474b88 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala @@ -0,0 +1,81 @@ +package org.apache.s2graph.rest.play + +import java.util.concurrent.Executors + +import org.apache.s2graph.core.rest.{RequestParser, RestHandler} +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{ExceptionHandler, Graph, Management} +import org.apache.s2graph.rest.play.actors.QueueActor +import org.apache.s2graph.rest.play.config.Config +import org.apache.s2graph.rest.play.controllers.ApplicationController +import play.api.Application +import play.api.mvc.{WithFilters, _} +import play.filters.gzip.GzipFilter + +import scala.concurrent.{ExecutionContext, Future} +import scala.io.Source +import scala.util.Try + +object Global extends WithFilters(new GzipFilter()) { + var s2graph: Graph = _ + var storageManagement: Management = _ + var s2parser: RequestParser = _ + var s2rest: RestHandler = _ + + // Application entry point + override def onStart(app: Application) { + ApplicationController.isHealthy = false + + val numOfThread = Runtime.getRuntime.availableProcessors() + val threadPool = Executors.newFixedThreadPool(numOfThread) + val ec = ExecutionContext.fromExecutor(threadPool) + + val config = Config.conf.underlying + + // init s2graph with config + s2graph = new Graph(config)(ec) + storageManagement = new Management(s2graph) + s2parser = new RequestParser(s2graph.config) // merged config + s2rest = new RestHandler(s2graph)(ec) + + QueueActor.init(s2graph) + + if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) { + ExceptionHandler.apply(config) + } + + val defaultHealthOn = Config.conf.getBoolean("app.health.on").getOrElse(true) + ApplicationController.deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get + + ApplicationController.isHealthy = defaultHealthOn + logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}") + } + + override def onStop(app: Application) { + QueueActor.shutdown() + + if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) { + ExceptionHandler.shutdown() + } + + /** + * shutdown hbase client for flush buffers. + */ + s2graph.shutdown() + } + + override def onError(request: RequestHeader, ex: Throwable): Future[Result] = { + logger.error(s"onError => ip:${request.remoteAddress}, request:${request}", ex) + Future.successful(Results.InternalServerError) + } + + override def onHandlerNotFound(request: RequestHeader): Future[Result] = { + logger.error(s"onHandlerNotFound => ip:${request.remoteAddress}, request:${request}") + Future.successful(Results.NotFound) + } + + override def onBadRequest(request: RequestHeader, error: String): Future[Result] = { + logger.error(s"onBadRequest => ip:${request.remoteAddress}, request:$request, error:$error") + Future.successful(Results.BadRequest(error)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala new file mode 100644 index 0000000..2559fd1 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala @@ -0,0 +1,89 @@ +package org.apache.s2graph.rest.play.actors + +import java.util.concurrent.TimeUnit + +import akka.actor._ +import org.apache.s2graph.core.ExceptionHandler._ +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{ExceptionHandler, Graph, GraphElement} +import org.apache.s2graph.rest.play.actors.Protocol.FlushAll +import org.apache.s2graph.rest.play.config.Config +import play.api.Play.current +import play.api.libs.concurrent.Akka + +import scala.collection.mutable +import scala.concurrent.duration.Duration + +object Protocol { + + case object Flush + + case object FlushAll + +} + +object QueueActor { + /** we are throttling down here so fixed number of actor to constant */ + var router: ActorRef = _ + + // Akka.system.actorOf(props(), name = "queueActor") + def init(s2: Graph) = { + router = Akka.system.actorOf(props(s2)) + } + + def shutdown() = { + router ! FlushAll + Akka.system.shutdown() + Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2) + } + + def props(s2: Graph): Props = Props(classOf[QueueActor], s2) +} + +class QueueActor(s2: Graph) extends Actor with ActorLogging { + + import Protocol._ + + implicit val ec = context.system.dispatcher + // logger.error(s"QueueActor: $self") + val queue = mutable.Queue.empty[GraphElement] + var queueSize = 0L + val maxQueueSize = Config.LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE + val timeUnitInMillis = 10 + val rateLimitTimeStep = 1000 / timeUnitInMillis + val rateLimit = Config.LOCAL_QUEUE_ACTOR_RATE_LIMIT / rateLimitTimeStep + + + context.system.scheduler.schedule(Duration.Zero, Duration(timeUnitInMillis, TimeUnit.MILLISECONDS), self, Flush) + + override def receive: Receive = { + case element: GraphElement => + + if (queueSize > maxQueueSize) { + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, element, None)) + } else { + queueSize += 1L + queue.enqueue(element) + } + + case Flush => + val elementsToFlush = + if (queue.size < rateLimit) queue.dequeueAll(_ => true) + else (0 until rateLimit).map(_ => queue.dequeue()) + + val flushSize = elementsToFlush.size + + queueSize -= elementsToFlush.length + s2.mutateElements(elementsToFlush) + + if (flushSize > 0) { + logger.info(s"flush: $flushSize, $queueSize") + } + + case FlushAll => + s2.mutateElements(queue) + context.stop(self) + + case _ => logger.error("unknown protocol") + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala new file mode 100644 index 0000000..0254506 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala @@ -0,0 +1,41 @@ +package org.apache.s2graph.rest.play.config + +import play.api.Play + +object Config { + // HBASE + lazy val HBASE_ZOOKEEPER_QUORUM = conf.getString("hbase.zookeeper.quorum").getOrElse("localhost") + + + // HBASE CLIENT + lazy val ASYNC_HBASE_CLIENT_FLUSH_INTERVAL = conf.getInt("async.hbase.client.flush.interval").getOrElse(1000).toShort + lazy val RPC_TIMEOUT = conf.getInt("hbase.client.operation.timeout").getOrElse(1000) + lazy val MAX_ATTEMPT = conf.getInt("hbase.client.operation.maxAttempt").getOrElse(3) + + // PHASE + lazy val PHASE = conf.getString("phase").getOrElse("dev") + lazy val conf = Play.current.configuration + + // CACHE + lazy val CACHE_TTL_SECONDS = conf.getInt("cache.ttl.seconds").getOrElse(600) + lazy val CACHE_MAX_SIZE = conf.getInt("cache.max.size").getOrElse(10000) + + //KAFKA + lazy val KAFKA_METADATA_BROKER_LIST = conf.getString("kafka.metadata.broker.list").getOrElse("localhost") + lazy val KAFKA_PRODUCER_POOL_SIZE = conf.getInt("kafka.producer.pool.size").getOrElse(0) + lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}" + lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async" + lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed" + + // is query or write + lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true) + lazy val IS_WRITE_SERVER = conf.getBoolean("is.write.server").getOrElse(true) + + + // query limit per step + lazy val QUERY_HARD_LIMIT = conf.getInt("query.hard.limit").getOrElse(300) + + // local queue actor + lazy val LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE = conf.getInt("local.queue.actor.max.queue.size").getOrElse(10000) + lazy val LOCAL_QUEUE_ACTOR_RATE_LIMIT = conf.getInt("local.queue.actor.rate.limit").getOrElse(1000) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/config/CounterConfig.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/config/CounterConfig.scala b/s2rest_play/app/org/apache/s2graph/rest/play/config/CounterConfig.scala new file mode 100644 index 0000000..0bcdd81 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/config/CounterConfig.scala @@ -0,0 +1,10 @@ +package org.apache.s2graph.rest.play.config + +/** + * Created by hsleep([email protected]) on 15. 9. 3.. + */ +object CounterConfig { + // kafka + lazy val KAFKA_TOPIC_COUNTER = s"s2counter-${Config.PHASE}" + lazy val KAFKA_TOPIC_COUNTER_TRX = s"s2counter-trx-${Config.PHASE}" +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala new file mode 100644 index 0000000..00e0741 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala @@ -0,0 +1,424 @@ +package org.apache.s2graph.rest.play.controllers + +import org.apache.s2graph.core.Management +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.rest.RequestParser +import org.apache.s2graph.core.utils.logger +import play.api.libs.functional.syntax._ +import play.api.libs.json._ +import play.api.mvc +import play.api.mvc.{Action, Controller} + +import scala.util.{Failure, Success, Try} + +object AdminController extends Controller { + + import ApplicationController._ + private val management: Management = org.apache.s2graph.rest.play.Global.storageManagement + private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser + + /** + * admin message formatter + * @tparam T + */ + trait AdminMessageFormatter[T] { + def toJson(msg: T): JsValue + } + + object AdminMessageFormatter { + implicit def jsValueToJson[T <: JsValue] = new AdminMessageFormatter[T] { + def toJson(js: T) = js + } + + implicit val stringToJson = new AdminMessageFormatter[String] { + def toJson(js: String) = Json.obj("message" -> js) + } + } + + def format[T: AdminMessageFormatter](f: JsValue => play.mvc.Result)(message: T) = { + val formatter = implicitly[AdminMessageFormatter[T]] + f(formatter.toJson(message)) + } + + /** + * ok response + * @param message + * @tparam T + * @return + */ + def ok[T: AdminMessageFormatter](message: T) = { + val formatter = implicitly[AdminMessageFormatter[T]] + Ok(formatter.toJson(message)).as(applicationJsonHeader) + } + + /** + * bad request response + * @param message + * @tparam T + * @return + */ + def bad[T: AdminMessageFormatter](message: T) = { + val formatter = implicitly[AdminMessageFormatter[T]] + BadRequest(formatter.toJson(message)).as(applicationJsonHeader) + } + + /** + * not found response + * @param message + * @tparam T + * @return + */ + def notFound[T: AdminMessageFormatter](message: T) = { + val formatter = implicitly[AdminMessageFormatter[T]] + NotFound(formatter.toJson(message)).as(applicationJsonHeader) + } + + private[AdminController] def tryResponse[T, R: AdminMessageFormatter](res: Try[T])(callback: T => R): mvc.Result = res match { + case Success(m) => + val ret = callback(m) + logger.info(ret.toString) + ok(ret) + case Failure(error) => + logger.error(error.getMessage, error) + error match { + case JsResultException(e) => bad(JsError.toFlatJson(e)) + case _ => bad(error.getMessage) + } + } + + def optionResponse[T, R: AdminMessageFormatter](res: Option[T])(callback: T => R): mvc.Result = res match { + case Some(m) => ok(callback(m)) + case None => notFound("not found") + } + + /** + * load all model cache + * @return + */ + def loadCache() = Action { request => + val startTs = System.currentTimeMillis() + + if (!ApplicationController.isHealthy) { + loadCacheInner() + } + + ok(s"${System.currentTimeMillis() - startTs}") + } + + def loadCacheInner() = { + Service.findAll() + ServiceColumn.findAll() + Label.findAll() + LabelMeta.findAll() + LabelIndex.findAll() + ColumnMeta.findAll() + } + + /** + * read + */ + + /** + * get service info + * @param serviceName + * @return + */ + def getService(serviceName: String) = Action { request => + val serviceOpt = Management.findService(serviceName) + optionResponse(serviceOpt)(_.toJson) + } + + /** + * get label info + * @param labelName + * @return + */ + def getLabel(labelName: String) = Action { request => + val labelOpt = Management.findLabel(labelName) + optionResponse(labelOpt)(_.toJson) + } + + /** + * get all labels of service + * @param serviceName + * @return + */ + def getLabels(serviceName: String) = Action { request => + Service.findByName(serviceName) match { + case None => notFound(s"Service $serviceName not found") + case Some(service) => + val src = Label.findBySrcServiceId(service.id.get) + val tgt = Label.findByTgtServiceId(service.id.get) + + ok(Json.obj("from" -> src.map(_.toJson), "to" -> tgt.map(_.toJson))) + } + } + + /** + * get service columns + * @param serviceName + * @param columnName + * @return + */ + def getServiceColumn(serviceName: String, columnName: String) = Action { request => + val serviceColumnOpt = for { + service <- Service.findByName(serviceName) + serviceColumn <- ServiceColumn.find(service.id.get, columnName, useCache = false) + } yield serviceColumn + + optionResponse(serviceColumnOpt)(_.toJson) + } + + /** + * create + */ + + /** + * create service + * @return + */ + def createService() = Action(parse.json) { request => + val serviceTry = createServiceInner(request.body) + tryResponse(serviceTry)(_.toJson) + } + + + def createServiceInner(jsValue: JsValue) = { + val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = requestParser.toServiceElements(jsValue) + management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) + } + + /** + * create label + * @return + */ + def createLabel() = Action(parse.json) { request => + val ret = createLabelInner(request.body) + tryResponse(ret)(_.toJson) + } + + + def createLabelInner(json: JsValue) = for { + labelArgs <- requestParser.toLabelElements(json) + label <- (management.createLabel _).tupled(labelArgs) + } yield label + + /** + * add index + * @return + */ + def addIndex() = Action(parse.json) { request => + val ret = addIndexInner(request.body) + tryResponse(ret)(_.label + " is updated") + } + + def addIndexInner(json: JsValue) = for { + (labelName, indices) <- requestParser.toIndexElements(json) + label <- Management.addIndex(labelName, indices) + } yield label + + /** + * create service column + * @return + */ + def createServiceColumn() = Action(parse.json) { request => + val serviceColumnTry = createServiceColumnInner(request.body) + tryResponse(serviceColumnTry) { (columns: Seq[ColumnMeta]) => Json.obj("metas" -> columns.map(_.toJson)) } + } + + def createServiceColumnInner(jsValue: JsValue) = for { + (serviceName, columnName, columnType, props) <- requestParser.toServiceColumnElements(jsValue) + serviceColumn <- Management.createServiceColumn(serviceName, columnName, columnType, props) + } yield serviceColumn + + /** + * delete + */ + + /** + * delete label + * @param labelName + * @return + */ + def deleteLabel(labelName: String) = Action { request => + val deleteLabelTry = deleteLabelInner(labelName) + tryResponse(deleteLabelTry)(labelName => labelName + " is deleted") + } + + def deleteLabelInner(labelName: String) = Management.deleteLabel(labelName) + + /** + * delete servieColumn + * @param serviceName + * @param columnName + * @return + */ + def deleteServiceColumn(serviceName: String, columnName: String) = Action { request => + val serviceColumnTry = deleteServiceColumnInner(serviceName, columnName) + tryResponse(serviceColumnTry)(columnName => columnName + " is deleted") + } + + def deleteServiceColumnInner(serviceName: String, columnName: String) = + Management.deleteColumn(serviceName, columnName) + + /** + * update + */ + + /** + * add Prop to label + * @param labelName + * @return + */ + def addProp(labelName: String) = Action(parse.json) { request => + val labelMetaTry = addPropInner(labelName, request.body) + tryResponse(labelMetaTry)(_.toJson) + } + + def addPropInner(labelName: String, js: JsValue) = for { + prop <- requestParser.toPropElements(js) + labelMeta <- Management.addProp(labelName, prop) + } yield labelMeta + + /** + * add prop to serviceColumn + * @param serviceName + * @param columnName + * @return + */ + def addServiceColumnProp(serviceName: String, columnName: String) = Action(parse.json) { request => + addServiceColumnPropInner(serviceName, columnName)(request.body) match { + case None => bad(s"can`t find service with $serviceName or can`t find serviceColumn with $columnName") + case Some(m) => Ok(m.toJson).as(applicationJsonHeader) + } + } + + def addServiceColumnPropInner(serviceName: String, columnName: String)(js: JsValue) = { + for { + service <- Service.findByName(serviceName) + serviceColumn <- ServiceColumn.find(service.id.get, columnName) + prop <- requestParser.toPropElements(js).toOption + } yield { + ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, prop.defaultValue) + } + } + + /** + * add props to serviceColumn + * @param serviecName + * @param columnName + * @return + */ + def addServiceColumnProps(serviecName: String, columnName: String) = Action(parse.json) { request => + val jsObjs = request.body.asOpt[List[JsObject]].getOrElse(List.empty[JsObject]) + val newProps = for { + js <- jsObjs + newProp <- addServiceColumnPropInner(serviecName, columnName)(js) + } yield newProp + ok(s"${newProps.size} is added.") + } + + /** + * copy label + * @param oldLabelName + * @param newLabelName + * @return + */ + def copyLabel(oldLabelName: String, newLabelName: String) = Action { request => + val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName)) + tryResponse(copyTry)(_.label + "created") + } + + /** + * rename label + * @param oldLabelName + * @param newLabelName + * @return + */ + def renameLabel(oldLabelName: String, newLabelName: String) = Action { request => + Label.findByName(oldLabelName) match { + case None => NotFound.as(applicationJsonHeader) + case Some(label) => + Management.updateLabelName(oldLabelName, newLabelName) + ok(s"Label was updated") + } + } + + /** + * update HTable for a label + * @param labelName + * @param newHTableName + * @return + */ + def updateHTable(labelName: String, newHTableName: String) = Action { request => + val updateTry = Management.updateHTable(labelName, newHTableName) + tryResponse(updateTry)(_.toString + " label(s) updated.") + } + + + case class HTableParams(cluster: String, hTableName: String, + preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) { + + override def toString(): String = { + s"""HtableParams + |-- cluster : $cluster + |-- hTableName : $hTableName + |-- preSplitSize : $preSplitSize + |-- hTableTTL : $hTableTTL + |-- compressionAlgorithm : $compressionAlgorithm + |""".stripMargin + } + } + + implicit object HTableParamsJsonConverter extends Format[HTableParams] { + def reads(json: JsValue): JsResult[HTableParams] = ( + (__ \ "cluster").read[String] and + (__ \ "hTableName").read[String] and + (__ \ "preSplitSize").read[Int] and + (__ \ "hTableTTL").readNullable[Int] and + (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json) + + def writes(o: HTableParams): JsValue = Json.obj( + "cluster" -> o.cluster, + "hTableName" -> o.hTableName, + "preSplitSize" -> o.preSplitSize, + "hTableTTL" -> o.hTableTTL, + "compressionAlgorithm" -> o.compressionAlgorithm + ) + } + + implicit object JsErrorJsonWriter extends Writes[JsError] { + def writes(o: JsError): JsValue = Json.obj( + "errors" -> JsArray( + o.errors.map { + case (path, validationErrors) => Json.obj( + "path" -> Json.toJson(path.toString()), + "validationErrors" -> JsArray(validationErrors.map(validationError => Json.obj( + "message" -> JsString(validationError.message), + "args" -> JsArray(validationError.args.map(_ match { + case x: Int => JsNumber(x) + case x => JsString(x.toString) + })) + ))) + ) + } + ) + ) + } + + def createHTable() = Action { request => + + // Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm) + request.body.asJson.map(_.validate[HTableParams] match { + case JsSuccess(hTableParams, _) => { + management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), + hTableParams.preSplitSize, hTableParams.hTableTTL, + hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm)) + logger.info(hTableParams.toString()) + ok(s"HTable was created.") + } + case err@JsError(_) => bad(Json.toJson(err)) + }).getOrElse(bad("Invalid Json.")) + + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala new file mode 100644 index 0000000..3482f6d --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala @@ -0,0 +1,101 @@ +package org.apache.s2graph.rest.play.controllers + +import org.apache.s2graph.core.GraphExceptions.BadQueryException +import org.apache.s2graph.core.PostProcess +import org.apache.s2graph.core.utils.logger +import play.api.libs.iteratee.Enumerator +import play.api.libs.json.{JsString, JsValue} +import play.api.mvc._ + +import scala.concurrent.{ExecutionContext, Future} + +object ApplicationController extends Controller { + + var isHealthy = true + var deployInfo = "" + val applicationJsonHeader = "application/json" + + val jsonParser: BodyParser[JsValue] = s2parse.json + + val jsonText: BodyParser[String] = s2parse.jsonText + + private def badQueryExceptionResults(ex: Exception) = + Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader)) + + private def errorResults = + Future.successful(Ok(PostProcess.emptyResults).as(applicationJsonHeader)) + + def requestFallback(body: String): PartialFunction[Throwable, Future[Result]] = { + case e: BadQueryException => + logger.error(s"{$body}, ${e.getMessage}", e) + badQueryExceptionResults(e) + case e: Exception => + logger.error(s"${body}, ${e.getMessage}", e) + errorResults + } + + def updateHealthCheck(isHealthy: Boolean) = Action { request => + this.isHealthy = isHealthy + Ok(this.isHealthy + "\n") + } + + def healthCheck() = withHeader(parse.anyContent) { request => + if (isHealthy) Ok(deployInfo) + else NotFound + } + + def jsonResponse(json: JsValue, headers: (String, String)*) = + if (ApplicationController.isHealthy) { + Ok(json).as(applicationJsonHeader).withHeaders(headers: _*) + } else { + Result( + header = ResponseHeader(OK), + body = Enumerator(json.toString.getBytes()), + connection = HttpConnection.Close + ).as(applicationJsonHeader).withHeaders((CONNECTION -> "close") +: headers: _*) + } + + def toLogMessage[A](request: Request[A], result: Result)(startedAt: Long): String = { + val duration = System.currentTimeMillis() - startedAt + val isQueryRequest = result.header.headers.contains("result_size") + val resultSize = result.header.headers.getOrElse("result_size", "-1") + + try { + val body = request.body match { + case AnyContentAsJson(jsValue) => jsValue match { + case JsString(str) => str + case _ => jsValue.toString + } + case AnyContentAsEmpty => "" + case _ => request.body.toString + } + + val str = + if (isQueryRequest) + s"${request.method} ${request.uri} took ${duration} ms ${result.header.status} ${resultSize} ${body}" + else + s"${request.method} ${request.uri} took ${duration} ms ${result.header.status} ${resultSize} ${body}" + + str + } finally { + /* pass */ + } + } + + def withHeaderAsync[A](bodyParser: BodyParser[A])(block: Request[A] => Future[Result])(implicit ex: ExecutionContext) = + Action.async(bodyParser) { request => + val startedAt = System.currentTimeMillis() + block(request).map { r => + logger.info(toLogMessage(request, r)(startedAt)) + r + } + } + + def withHeader[A](bodyParser: BodyParser[A])(block: Request[A] => Result) = + Action(bodyParser) { request: Request[A] => + val startedAt = System.currentTimeMillis() + val r = block(request) + logger.info(toLogMessage(request, r)(startedAt)) + r + } +}
