http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala new file mode 100644 index 0000000..4587d59 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala @@ -0,0 +1,744 @@ +package org.apache.s2graph.rest.play.controllers + +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.s2graph.core.ExceptionHandler +import org.apache.s2graph.core.ExceptionHandler.KafkaMessage +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.counter +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit +import org.apache.s2graph.counter.core._ +import org.apache.s2graph.counter.core.v1.{RankingStorageRedis, ExactStorageAsyncHBase} +import org.apache.s2graph.counter.core.v2.{RankingStorageGraph, ExactStorageGraph} +import org.apache.s2graph.counter.models.Counter.ItemType +import org.apache.s2graph.counter.models.{Counter, CounterModel} +import org.apache.s2graph.counter.util.{ReduceMapValue, CartesianProduct, UnitConverter} +import org.apache.s2graph.rest.play.config.CounterConfig +import org.apache.s2graph.rest.play.models._ +import play.api.Play +import play.api.libs.json.Reads._ +import play.api.libs.json._ +import play.api.mvc.{Action, Controller, Request} +import scala.concurrent.Future +import scala.util.{Failure, Success, Try} + +object CounterController extends Controller { + import play.api.libs.concurrent.Execution.Implicits.defaultContext + + val config = Play.current.configuration.underlying + val s2config = new S2CounterConfig(config) + + private val exactCounterMap = Map( + counter.VERSION_1 -> new ExactCounter(config, new ExactStorageAsyncHBase(config)), + counter.VERSION_2 -> new ExactCounter(config, new ExactStorageGraph(config)) + ) + private val rankingCounterMap = Map( + counter.VERSION_1 -> new RankingCounter(config, new RankingStorageRedis(config)), + counter.VERSION_2 -> new RankingCounter(config, new RankingStorageGraph(config)) + ) + + private val tablePrefixMap = Map ( + counter.VERSION_1 -> "s2counter", + counter.VERSION_2 -> "s2counter_v2" + ) + + private def exactCounter(version: Byte): ExactCounter = exactCounterMap(version) + private def rankingCounter(version: Byte): RankingCounter = rankingCounterMap(version) + + lazy val counterModel = new CounterModel(config) + + def getQueryString[T](key: String, default: String)(implicit request: Request[T]): String = { + request.getQueryString(key).getOrElse(default) + } + + implicit val counterWrites = new Writes[Counter] { + override def writes(o: Counter): JsValue = Json.obj( + "version" -> o.version.toInt, + "autoComb" -> o.autoComb, + "dimension" -> o.dimension, + "useProfile" -> o.useProfile, + "bucketImpId" -> o.bucketImpId, + "useRank" -> o.useRank, + "intervalUnit" -> o.intervalUnit, + "ttl" -> o.ttl, + "dailyTtl" -> o.dailyTtl, + "rateAction" -> o.rateActionId.flatMap { actionId => + counterModel.findById(actionId, useCache = false).map { actionPolicy => + Json.obj("service" -> actionPolicy.service, "action" -> actionPolicy.action) + } + }, + "rateBase" -> o.rateBaseId.flatMap { baseId => + counterModel.findById(baseId, useCache = false).map { basePolicy => + Json.obj("service" -> basePolicy.service, "action" -> basePolicy.action) + } + }, + "rateThreshold" -> o.rateThreshold + ) + } + + def createAction(service: String, action: String) = Action(s2parse.json) { implicit request => + counterModel.findByServiceAction(service, action, useCache = false) match { + case None => + val body = request.body + val version = (body \ "version").asOpt[Int].map(_.toByte).getOrElse(counter.VERSION_2) + val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(true) + val dimension = (body \ "dimension").asOpt[String].getOrElse("") + val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(false) + val bucketImpId = (body \ "bucketImpId").asOpt[String] + + val useExact = (body \ "useExact").asOpt[Boolean].getOrElse(true) + val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(true) + + val intervalUnit = (body \ "intervalUnit").asOpt[String] + // 2 day + val ttl = (body \ "ttl").asOpt[Int].getOrElse(2 * 24 * 60 * 60) + val dailyTtl = (body \ "dailyTtl").asOpt[Int] + val regionMultiplier = (body \ "regionMultiplier").asOpt[Int].getOrElse(1) + + val rateAction = (body \ "rateAction").asOpt[Map[String, String]] + val rateBase = (body \ "rateBase").asOpt[Map[String, String]] + val rateThreshold = (body \ "rateThreshold").asOpt[Int] + + val rateActionId = { + for { + actionMap <- rateAction + service <- actionMap.get("service") + action <- actionMap.get("action") + policy <- counterModel.findByServiceAction(service, action) + } yield { + policy.id + } + } + val rateBaseId = { + for { + actionMap <- rateBase + service <- actionMap.get("service") + action <- actionMap.get("action") + policy <- counterModel.findByServiceAction(service, action) + } yield { + policy.id + } + } + + val hbaseTable = { + Seq(tablePrefixMap(version), service, ttl) ++ dailyTtl mkString "_" + } + + // find label + val itemType = Label.findByName(action, useCache = false) match { + case Some(label) => + ItemType.withName(label.tgtColumnType.toUpperCase) + case None => + val strItemType = (body \ "itemType").asOpt[String].getOrElse("STRING") + ItemType.withName(strItemType.toUpperCase) + } + val policy = Counter(useFlag = true, version, service, action, itemType, autoComb = autoComb, dimension, + useProfile = useProfile, bucketImpId, useRank = useRank, ttl, dailyTtl, Some(hbaseTable), intervalUnit, + rateActionId, rateBaseId, rateThreshold) + + // prepare exact storage + exactCounter(version).prepare(policy) + if (useRank) { + // prepare ranking storage + rankingCounter(version).prepare(policy) + } + counterModel.createServiceAction(policy) + Ok(Json.toJson(Map("msg" -> s"created $service/$action"))) + case Some(policy) => + Ok(Json.toJson(Map("msg" -> s"already exist $service/$action"))) + } + } + + def getAction(service: String, action: String) = Action { request => + counterModel.findByServiceAction(service, action, useCache = false) match { + case Some(policy) => + Ok(Json.toJson(policy)) + case None => + NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))) + } + } + + def updateAction(service: String, action: String) = Action(s2parse.json) { request => + counterModel.findByServiceAction(service, action, useCache = false) match { + case Some(oldPolicy) => + val body = request.body + val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(oldPolicy.autoComb) + val dimension = (body \ "dimension").asOpt[String].getOrElse(oldPolicy.dimension) + val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(oldPolicy.useProfile) + val bucketImpId = (body \ "bucketImpId").asOpt[String] match { + case Some(s) => Some(s) + case None => oldPolicy.bucketImpId + } + + val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(oldPolicy.useRank) + + val intervalUnit = (body \ "intervalUnit").asOpt[String] match { + case Some(s) => Some(s) + case None => oldPolicy.intervalUnit + } + + val rateAction = (body \ "rateAction").asOpt[Map[String, String]] + val rateBase = (body \ "rateBase").asOpt[Map[String, String]] + val rateThreshold = (body \ "rateThreshold").asOpt[Int] match { + case Some(i) => Some(i) + case None => oldPolicy.rateThreshold + } + + val rateActionId = { + for { + actionMap <- rateAction + service <- actionMap.get("service") + action <- actionMap.get("action") + policy <- counterModel.findByServiceAction(service, action, useCache = false) + } yield { + policy.id + } + } match { + case Some(i) => Some(i) + case None => oldPolicy.rateActionId + } + val rateBaseId = { + for { + actionMap <- rateBase + service <- actionMap.get("service") + action <- actionMap.get("action") + policy <- counterModel.findByServiceAction(service, action, useCache = false) + } yield { + policy.id + } + } match { + case Some(i) => Some(i) + case None => oldPolicy.rateBaseId + } + + // new counter + val policy = Counter(id = oldPolicy.id, useFlag = oldPolicy.useFlag, oldPolicy.version, service, action, oldPolicy.itemType, autoComb = autoComb, dimension, + useProfile = useProfile, bucketImpId, useRank = useRank, oldPolicy.ttl, oldPolicy.dailyTtl, oldPolicy.hbaseTable, intervalUnit, + rateActionId, rateBaseId, rateThreshold) + + counterModel.updateServiceAction(policy) + Ok(Json.toJson(Map("msg" -> s"updated $service/$action"))) + case None => + NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))) + } + } + + def prepareAction(service: String, action: String) = Action(s2parse.json) { request => + // for data migration + counterModel.findByServiceAction(service, action, useCache = false) match { + case Some(policy) => + val body = request.body + val version = (body \ "version").as[Int].toByte + if (version != policy.version) { + // change table name + val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) ++ policy.dailyTtl mkString "_" + val newPolicy = policy.copy(version = version, hbaseTable = Some(newTableName)) + exactCounter(version).prepare(newPolicy) + if (newPolicy.useRank) { + rankingCounter(version).prepare(newPolicy) + } + Ok(Json.toJson(Map("msg" -> s"prepare storage v$version $service/$action"))) + } else { + Ok(Json.toJson(Map("msg" -> s"already prepared storage v$version $service/$action"))) + } + case None => + NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))) + } + } + + def deleteAction(service: String, action: String) = Action.apply { + { + for { + policy <- counterModel.findByServiceAction(service, action, useCache = false) + } yield { + Try { + exactCounter(policy.version).destroy(policy) + if (policy.useRank) { + rankingCounter(policy.version).destroy(policy) + } + counterModel.deleteServiceAction(policy) + } match { + case Success(v) => + Ok(Json.toJson(Map("msg" -> s"deleted $service/$action"))) + case Failure(ex) => + throw ex + } + } + }.getOrElse(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))) + } + + def getExactCountAsync(service: String, action: String, item: String) = Action.async { implicit request => + val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq + .map(IntervalUnit.withName) + val limit = getQueryString("limit", "1").toInt + + val qsSum = request.getQueryString("sum") + + val optFrom = request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis) + val optTo = request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis) + + val limitOpt = (optFrom, optTo) match { + case (Some(_), Some(_)) => + None + case _ => + Some(limit) + } + + // find dimension + lazy val dimQueryValues = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) => + (k.substring(1), v.mkString(",").split(',').filter(_.nonEmpty).toSet) + } +// Logger.warn(s"$dimQueryValues") + + counterModel.findByServiceAction(service, action) match { + case Some(policy) => + val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optFrom, optTo) + val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo) + try { +// Logger.warn(s"$tqs $qsSum") + if (tqs.head.length > 1 && qsSum.isDefined) { + getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum).map { jsVal => + Ok(jsVal) + } + } else { + getExactCountToJs(policy, item, timeRange, limitOpt, dimQueryValues).map { jsVal => + Ok(jsVal) + } + } + } catch { + case e: Exception => + throw e +// Future.successful(BadRequest(s"$service, $action, $item")) + } + case None => + Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))) + } + } + + /** + * [{ + * "service": , "action", "itemIds": [], "interval": string, "limit": int, "from": ts, "to": ts, + * "dimensions": [{"key": list[String]}] + * }] + * @return + */ + private def parseExactCountParam(jsValue: JsValue) = { + val service = (jsValue \ "service").as[String] + val action = (jsValue \ "action").as[String] + val itemIds = (jsValue \ "itemIds").as[Seq[String]] + val intervals = (jsValue \ "intervals").asOpt[Seq[String]].getOrElse(Seq("t")).distinct.map(IntervalUnit.withName) + val limit = (jsValue \ "limit").asOpt[Int].getOrElse(1) + val from = (jsValue \ "from").asOpt[Long] + val to = (jsValue \ "to").asOpt[Long] + val sum = (jsValue \ "sum").asOpt[String] + val dimensions = { + for { + dimension <- (jsValue \ "dimensions").asOpt[Seq[JsObject]].getOrElse(Nil) + (k, vs) <- dimension.fields + } yield { + k -> vs.as[Seq[String]].toSet + } + }.toMap + (service, action, itemIds, intervals, limit, from, to, dimensions, sum) + } + + def getExactCountAsyncMulti = Action.async(s2parse.json) { implicit request => + val jsValue = request.body + try { + val futures = { + for { + jsObject <- jsValue.asOpt[List[JsObject]].getOrElse(Nil) + (service, action, itemIds, intervalUnits, limit, from, to, dimQueryValues, qsSum) = parseExactCountParam(jsObject) + optFrom = from.map(UnitConverter.toMillis) + optTo = to.map(UnitConverter.toMillis) + timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo) + policy <- counterModel.findByServiceAction(service, action).toSeq + item <- itemIds + } yield { + val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optFrom, optTo) + val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo) + val limitOpt = (optFrom, optTo) match { + case (Some(_), Some(_)) => + None + case _ => + Some(limit) + } + +// Logger.warn(s"$item, $limit, $optFrom, $optTo, $qsSum, $tqs") + + if (tqs.head.length > 1 && qsSum.isDefined) { + getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum) + } else { + getExactCountToJs(policy, item, timeRange, limitOpt, dimQueryValues) + } + } + } + Future.sequence(futures).map { rets => + Ok(Json.toJson(rets)) + } + } catch { + case e: Exception => + throw e +// Future.successful(BadRequest(s"$jsValue")) + } + } + + private [controllers] def fetchedToResult(fetchedCounts: FetchedCountsGrouped, limitOpt: Option[Int]): Seq[ExactCounterIntervalItem] = { + for { + ((interval, dimKeyValues), values) <- fetchedCounts.intervalWithCountMap + } yield { + val counterItems = { + val sortedItems = values.toSeq.sortBy { case (eq, v) => -eq.tq.ts } + val limited = limitOpt match { + case Some(limit) => sortedItems.take(limit) + case None => sortedItems + } + for { + (eq, value) <- limited + } yield { + ExactCounterItem(eq.tq.ts, value, value.toDouble) + } + } + ExactCounterIntervalItem(interval.toString, dimKeyValues, counterItems) + } + }.toSeq + + private def decayedToResult(decayedCounts: DecayedCounts): Seq[ExactCounterIntervalItem] = { + for { + (eq, score) <- decayedCounts.qualifierWithCountMap + } yield { + ExactCounterIntervalItem(eq.tq.q.toString, eq.dimKeyValues, Seq(ExactCounterItem(eq.tq.ts, score.toLong, score))) + } + }.toSeq + + private def getExactCountToJs(policy: Counter, + item: String, + timeRange: Seq[(TimedQualifier, TimedQualifier)], + limitOpt: Option[Int], + dimQueryValues: Map[String, Set[String]]): Future[JsValue] = { + exactCounter(policy.version).getCountsAsync(policy, Seq(item), timeRange, dimQueryValues).map { seq => + val items = { + for { + fetched <- seq + } yield { + fetchedToResult(fetched, limitOpt) + } + }.flatten + Json.toJson(ExactCounterResult(ExactCounterResultMeta(policy.service, policy.action, item), items)) + } + } + + private def getDecayedCountToJs(policy: Counter, + item: String, + timeRange: Seq[(TimedQualifier, TimedQualifier)], + dimQueryValues: Map[String, Set[String]], + qsSum: Option[String]): Future[JsValue] = { + exactCounter(policy.version).getDecayedCountsAsync(policy, Seq(item), timeRange, dimQueryValues, qsSum).map { seq => + val decayedCounts = seq.head + val meta = ExactCounterResultMeta(policy.service, policy.action, decayedCounts.exactKey.itemKey) + val intervalItems = decayedToResult(decayedCounts) + Json.toJson(ExactCounterResult(meta, intervalItems)) + } + } + + def getRankingCountAsync(service: String, action: String) = Action.async { implicit request => + lazy val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq + .map(IntervalUnit.withName) + lazy val limit = getQueryString("limit", "1").toInt + lazy val kValue = getQueryString("k", "10").toInt + + lazy val qsSum = request.getQueryString("sum") + + lazy val optFrom = request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis) + lazy val optTo = request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis) + + // find dimension + lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) => + (k.substring(1), v.mkString(",").split(',').toList) + } + + val dimensions = { + for { + values <- CartesianProduct(dimensionMap.values.toList).toSeq + } yield { + dimensionMap.keys.zip(values).toMap + } + } + + counterModel.findByServiceAction(service, action) match { + case Some(policy) => + val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optTo) + val dimKeys = { + for { + dimension <- dimensions + } yield { + dimension -> tqs.map(tq => RankingKey(policy.id, policy.version, ExactQualifier(tq, dimension))) + } + } + + // if tqs has only 1 tq, do not apply sum function + try { + val rankResult = { + if (tqs.length > 1 && qsSum.isDefined) { + getSumRankCounterResultAsync(policy, dimKeys, kValue, qsSum) + } else { + // no summary + Future.successful(getRankCounterResult(policy, dimKeys, kValue)) + } + } + + rankResult.map { result => + Ok(Json.toJson(result)) + } + } catch { + case e: UnsupportedOperationException => + Future.successful(NotImplemented(Json.toJson( + Map("msg" -> e.getMessage) + ))) + case e: Throwable => + throw e + } + case None => + Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))) + } + } + + def deleteRankingCount(service: String, action: String) = Action.async { implicit request => + lazy val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq + .map(IntervalUnit.withName) + lazy val limit = getQueryString("limit", "1").toInt + + // find dimension + lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) => + (k.substring(1), v.mkString(",").split(',').toList) + } + + val dimensions = { + for { + values <- CartesianProduct(dimensionMap.values.toList).toSeq + } yield { + dimensionMap.keys.zip(values).toMap + } + } + + Future { + counterModel.findByServiceAction(service, action) match { + case Some(policy) => + val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit) + val keys = { + for { + dimension <- dimensions + tq <- tqs + } yield { + RankingKey(policy.id, policy.version, ExactQualifier(tq, dimension)) + } + } + + for { + key <- keys + } { + rankingCounter(policy.version).delete(key) + } + + Ok(JsObject( + Seq( + ("msg", Json.toJson(s"delete ranking in $service.$action")), + ("items", Json.toJson({ + for { + key <- keys + } yield { + s"${key.eq.tq.q}.${key.eq.tq.ts}.${key.eq.dimension}" + } + })) + ) + )) + case None => + NotFound(Json.toJson( + Map("msg" -> s"$service.$action not found") + )) + } + } + } + + val reduceRateRankingValue = new ReduceMapValue[ExactKeyTrait, RateRankingValue](RateRankingValue.reduce, RateRankingValue(-1, -1)) + + // change format + private def getDecayedCountsAsync(policy: Counter, + items: Seq[String], + timeRange: (TimedQualifier, TimedQualifier), + dimension: Map[String, String], + qsSum: Option[String]): Future[Seq[(ExactKeyTrait, Double)]] = { + exactCounter(policy.version).getDecayedCountsAsync(policy, items, Seq(timeRange), dimension.mapValues(s => Set(s)), qsSum).map { seq => + for { + DecayedCounts(exactKey, qcMap) <- seq + value <- qcMap.values + } yield { + exactKey -> value + } + } + } + + def getSumRankCounterResultAsync(policy: Counter, + dimKeys: Seq[(Map[String, String], Seq[RankingKey])], + kValue: Int, + qsSum: Option[String]): Future[RankCounterResult] = { + val futures = { + for { + (dimension, keys) <- dimKeys + } yield { + val tqs = keys.map(rk => rk.eq.tq) + val (tqFrom, tqTo) = (tqs.last, tqs.head) + val items = rankingCounter(policy.version).getAllItems(keys, kValue) +// Logger.warn(s"item count: ${items.length}") + val future = { + if (policy.isRateCounter) { + val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get + val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get + + val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq => + seq.map { case (k, score) => + ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1) + }.toMap + } + val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq => + seq.map { case (k, score) => + ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score) + }.toMap + } + futureAction.zip(futureBase).map { case (actionScores, baseScores) => + reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) => +// Logger.warn(s"$k -> $rrv") + k -> rrv.rankingValue.score + }.toSeq + } + } + else if (policy.isTrendCounter) { + val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get + val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get + + val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq => + seq.map { case (k, score) => + ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1) + }.toMap + } + val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom.add(-1), tqTo.add(-1)), dimension, qsSum).map { seq => + seq.map { case (k, score) => + ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score) + }.toMap + } + futureAction.zip(futureBase).map { case (actionScores, baseScores) => + reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) => +// Logger.warn(s"$k -> $rrv") + k -> rrv.rankingValue.score + }.toSeq + } + } + else { + getDecayedCountsAsync(policy, items, (tqFrom, tqTo), dimension, qsSum) + } + } + future.map { keyWithScore => + val ranking = keyWithScore.sortBy(-_._2).take(kValue) + val rankCounterItems = { + for { + idx <- ranking.indices + (exactKey, score) = ranking(idx) + } yield { + val realId = policy.itemType match { + case ItemType.BLOB => exactCounter(policy.version).getBlobValue(policy, exactKey.itemKey) + .getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} ${exactKey.itemKey}")) + case _ => exactKey.itemKey + } + RankCounterItem(idx + 1, realId, score) + } + } + + val eq = ExactQualifier(tqFrom, dimension) + RankCounterDimensionItem(eq.tq.q.toString, eq.tq.ts, eq.dimension, -1, rankCounterItems) + } + } + } + + Future.sequence(futures).map { dimensionResultList => + RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList) + } + } + + def getRankCounterResult(policy: Counter, dimKeys: Seq[(Map[String, String], Seq[RankingKey])], kValue: Int): RankCounterResult = { + val dimensionResultList = { + for { + (dimension, keys) <- dimKeys + key <- keys + } yield { + val rankingValue = rankingCounter(policy.version).getTopK(key, kValue) + val ranks = { + for { + rValue <- rankingValue.toSeq + idx <- rValue.values.indices + rank = idx + 1 + } yield { + val (id, score) = rValue.values(idx) + val realId = policy.itemType match { + case ItemType.BLOB => + exactCounter(policy.version) + .getBlobValue(policy, id) + .getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} $id")) + case _ => id + } + RankCounterItem(rank, realId, score) + } + } + val eq = key.eq + val tq = eq.tq + RankCounterDimensionItem(tq.q.toString, tq.ts, eq.dimension, rankingValue.map(v => v.totalScore).getOrElse(0d), ranks) + } + } + + RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList) + } + + type Record = ProducerRecord[String, String] + + def incrementCount(service: String, action: String, item: String) = Action.async(s2parse.json) { request => + Future { + /** + * { + * timestamp: Long + * property: {} + * value: Int + * } + */ + lazy val metaMap = Map("service" -> service, "action" -> action, "item" -> item) + counterModel.findByServiceAction(service, action).map { policy => + val body = request.body + try { + val ts = (body \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()).toString + val dimension = (body \ "dimension").asOpt[JsValue].getOrElse(Json.obj()) + val property = (body \ "property").asOpt[JsValue].getOrElse(Json.obj()) + + val msg = List(ts, service, action, item, dimension, property).mkString("\t") + + // produce to kafka + // hash partitioner by key + ExceptionHandler.enqueue(KafkaMessage(new Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg))) + + Ok(Json.toJson( + Map( + "meta" -> metaMap + ) + )) + } + catch { + case e: JsResultException => + BadRequest(Json.toJson( + Map("msg" -> s"need timestamp.") + )) + } + }.getOrElse { + NotFound(Json.toJson( + Map("msg" -> s"$service.$action not found") + )) + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala new file mode 100644 index 0000000..6c83743 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -0,0 +1,219 @@ +package org.apache.s2graph.rest.play.controllers + +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core.rest.RequestParser +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.rest.play.actors.QueueActor +import org.apache.s2graph.rest.play.config.Config +import play.api.libs.json._ +import play.api.mvc.{Controller, Result} + +import scala.collection.Seq +import scala.concurrent.Future + +object EdgeController extends Controller { + + import ApplicationController._ + import ExceptionHandler._ + import play.api.libs.concurrent.Execution.Implicits._ + + private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph + private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser + private def jsToStr(js: JsValue): String = js match { + case JsString(s) => s + case _ => js.toString() + } + + def toTsv(jsValue: JsValue, op: String): String = { + val ts = jsToStr(jsValue \ "timestamp") + val from = jsToStr(jsValue \ "from") + val to = jsToStr(jsValue \ "to") + val label = jsToStr(jsValue \ "label") + val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()) + + (jsValue \ "direction").asOpt[String] match { + case None => Seq(ts, op, "e", from, to, label, props).mkString("\t") + case Some(dir) => Seq(ts, op, "e", from, to, label, props, dir).mkString("\t") + } + } + + def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = { + if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) + + else { + try { + logger.debug(s"$jsValue") + val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation) + + for ((edge, orgJs) <- edges.zip(jsOrgs)) { + if (edge.isAsync) + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, Option(toTsv(orgJs, operation)))) + else + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, edge, Option(toTsv(orgJs, operation)))) + } + + val edgesToStore = edges.filterNot(e => e.isAsync) + + if (withWait) { + val rets = s2.mutateEdges(edgesToStore, withWait = true) + rets.map(Json.toJson(_)).map(jsonResponse(_)) + } else { + val rets = edgesToStore.map { edge => QueueActor.router ! edge; true } + Future.successful(jsonResponse(Json.toJson(rets))) + } + } catch { + case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e")) + case e: Exception => + logger.error(s"mutateAndPublish: $e", e) + Future.successful(InternalServerError(s"${e.getStackTrace}")) + } + } + } + + def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] = { + if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) + + logger.debug(s"$str") + val edgeStrs = str.split("\\n") + + var vertexCnt = 0L + var edgeCnt = 0L + try { + val elements = + for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); element <- Graph.toGraphElement(str)) yield { + element match { + case v: Vertex => vertexCnt += 1 + case e: Edge => edgeCnt += 1 + } + if (element.isAsync) { + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, element, Some(str))) + } else { + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, element, Some(str))) + } + element + } + + //FIXME: + val elementsToStore = elements.filterNot(e => e.isAsync) + if (withWait) { + val rets = s2.mutateElements(elementsToStore, withWait) + rets.map(Json.toJson(_)).map(jsonResponse(_)) + } else { + val rets = elementsToStore.map { element => QueueActor.router ! element; true } + Future.successful(jsonResponse(Json.toJson(rets))) + } + } catch { + case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e")) + case e: Throwable => + logger.error(s"mutateAndPublish: $e", e) + Future.successful(InternalServerError(s"${e.getStackTrace}")) + } + } + + def mutateBulk() = withHeaderAsync(parse.text) { request => + mutateAndPublish(request.body, withWait = false) + } + + def mutateBulkWithWait() = withHeaderAsync(parse.text) { request => + mutateAndPublish(request.body, withWait = true) + } + + def inserts() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insert") + } + + def insertsWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insert", withWait = true) + } + + def insertsBulk() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insertBulk") + } + + def deletes() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "delete") + } + + def deletesWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "delete", withWait = true) + } + + def updates() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "update") + } + + def updatesWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "update", withWait = true) + } + + def increments() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "increment") + } + + def incrementsWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "increment", withWait = true) + } + + def incrementCounts() = withHeaderAsync(jsonParser) { request => + val jsValue = request.body + val edges = requestParser.toEdges(jsValue, "incrementCount") + + s2.incrementCounts(edges, withWait = true).map { results => + val json = results.map { case (isSuccess, resultCount) => + Json.obj("success" -> isSuccess, "result" -> resultCount) + } + + jsonResponse(Json.toJson(json)) + } + } + + def deleteAll() = withHeaderAsync(jsonParser) { request => +// deleteAllInner(request.body, withWait = false) + deleteAllInner(request.body, withWait = true) + } + + def deleteAllInner(jsValue: JsValue, withWait: Boolean) = { + + /** logging for delete all request */ + def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, direction: String, topicOpt: Option[String]) = { + val kafkaMessages = for { + id <- ids + label <- labels + } yield { + val tsv = Seq(ts, "deleteAll", "e", jsToStr(id), jsToStr(id), label.label, "{}", direction).mkString("\t") + val topic = topicOpt.getOrElse { + if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC + } + + val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) + kafkaMsg + } + + ExceptionHandler.enqueues(kafkaMessages) + } + + def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], ts: Long, vertices: Seq[Vertex]) = { + enqueueLogMessage(ids, labels, ts, direction, None) + val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) + if (withWait) { + future + } else { + Future.successful(true) + } + } + + val deleteFutures = jsValue.as[Seq[JsValue]].map { json => + val (labels, direction, ids, ts, vertices) = requestParser.toDeleteParam(json) + if (labels.isEmpty || ids.isEmpty) Future.successful(true) + else deleteEach(labels, direction, ids, ts, vertices) + } + + val deleteResults = Future.sequence(deleteFutures) + deleteResults.map { rst => + logger.debug(s"deleteAllInner: $rst") + Ok(s"deleted... ${rst.toString()}") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala new file mode 100644 index 0000000..b489a81 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala @@ -0,0 +1,23 @@ +package org.apache.s2graph.rest.play.controllers + +import org.apache.s2graph.core.mysqls.Experiment +import org.apache.s2graph.core.rest.RestHandler +import play.api.mvc._ + +import scala.concurrent.ExecutionContext.Implicits.global + +object ExperimentController extends Controller { + private val rest: RestHandler = org.apache.s2graph.rest.play.Global.s2rest + + import ApplicationController._ + + def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(jsonText) { request => + val body = request.body + + val res = rest.doPost(request.uri, body, request.headers.get(Experiment.impressionKey)) + res.body.map { case js => + val headers = res.headers :+ ("result_size" -> rest.calcSize(js).toString) + jsonResponse(js, headers: _*) + } recoverWith ApplicationController.requestFallback(body) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala new file mode 100644 index 0000000..42710e1 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala @@ -0,0 +1,86 @@ +package org.apache.s2graph.rest.play.controllers + +import org.apache.s2graph.core.utils.logger +import play.api.Play +import play.api.libs.iteratee.Iteratee +import play.api.libs.json.{JsValue, Json} +import play.api.mvc._ + +import scala.concurrent.Future +import scala.util.control.NonFatal + +object s2parse extends BodyParsers { + + import parse._ + + val defaultMaxTextLength = 1024 * 512 + val defaultMaxJsonLength = 1024 * 512 + + def json: BodyParser[JsValue] = json(defaultMaxTextLength) + + /** + * parseText with application/json header for Pre-Process text + */ + def jsonText: BodyParser[String] = when( + _.contentType.exists(m => m.equalsIgnoreCase("text/json") || m.equalsIgnoreCase("application/json")), + jsonText(defaultMaxTextLength), + createBadResult("Expecting text/json or application/json body") + ) + + private def jsonText(maxLength: Int): BodyParser[String] = BodyParser("json, maxLength=" + maxLength) { request => + import play.api.libs.iteratee.Execution.Implicits.trampoline + import play.api.libs.iteratee.Traversable + + Traversable.takeUpTo[Array[Byte]](maxLength) + .transform(Iteratee.consume[Array[Byte]]().map(c => new String(c, "UTF-8"))) + .flatMap(Iteratee.eofOrElse(Results.EntityTooLarge)) + } + + def json(maxLength: Int): BodyParser[JsValue] = when( + _.contentType.exists(m => m.equalsIgnoreCase("text/json") || m.equalsIgnoreCase("application/json")), + tolerantJson(maxLength), + createBadResult("Expecting text/json or application/json body") + ) + + def tolerantJson(maxLength: Int): BodyParser[JsValue] = + tolerantBodyParser[JsValue]("json", maxLength, "Invalid Json") { (request, bytes) => + // Encoding notes: RFC 4627 requires that JSON be encoded in Unicode, and states that whether that's + // UTF-8, UTF-16 or UTF-32 can be auto detected by reading the first two bytes. So we ignore the declared + // charset and don't decode, we passing the byte array as is because Jackson supports auto detection. + Json.parse(bytes) + } + + private def tolerantBodyParser[A](name: String, maxLength: Int, errorMessage: String)(parser: (RequestHeader, Array[Byte]) => A): BodyParser[A] = + BodyParser(name + ", maxLength=" + maxLength) { request => + import play.api.libs.iteratee.Execution.Implicits.trampoline + import play.api.libs.iteratee.Traversable + + import scala.util.control.Exception._ + + val bodyParser: Iteratee[Array[Byte], Either[Result, Either[Future[Result], A]]] = + Traversable.takeUpTo[Array[Byte]](maxLength).transform( + Iteratee.consume[Array[Byte]]().map { bytes => + allCatch[A].either { + parser(request, bytes) + }.left.map { + case NonFatal(e) => + val txt = new String(bytes) + logger.error(s"$errorMessage: $txt", e) + createBadResult(s"$errorMessage: $e")(request) + case t => throw t + } + } + ).flatMap(Iteratee.eofOrElse(Results.EntityTooLarge)) + + bodyParser.mapM { + case Left(tooLarge) => Future.successful(Left(tooLarge)) + case Right(Left(badResult)) => badResult.map(Left.apply) + case Right(Right(body)) => Future.successful(Right(body)) + } + } + + private def createBadResult(msg: String): RequestHeader => Future[Result] = { request => + Play.maybeApplication.map(_.global.onBadRequest(request, msg)) + .getOrElse(Future.successful(Results.BadRequest)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala new file mode 100644 index 0000000..fe478e6 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala @@ -0,0 +1,54 @@ +package org.apache.s2graph.rest.play.controllers + +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.s2graph.core.ExceptionHandler +import org.apache.s2graph.rest.play.config.Config +import play.api.mvc._ + +import scala.concurrent.Future + +object PublishController extends Controller { + + import ApplicationController._ + import ExceptionHandler._ + import play.api.libs.concurrent.Execution.Implicits._ + + /** + * never check validation on string. just redirect strings to kafka. + */ + val serviceNotExistException = new RuntimeException(s"service is not created in s2graph. create service first.") + + // private def toService(topic: String): String = { + // Service.findByName(topic).map(service => s"${service.serviceName}-${Config.PHASE}").getOrElse(throw serviceNotExistException) + // } + def publishOnly(topic: String) = withHeaderAsync(parse.text) { request => + if (!Config.IS_WRITE_SERVER) Future.successful(UNAUTHORIZED) + // val kafkaTopic = toService(topic) + val strs = request.body.split("\n") + strs.foreach(str => { + val keyedMessage = new ProducerRecord[Key, Val](Config.KAFKA_LOG_TOPIC, str) + // val keyedMessage = new ProducerRecord[Key, Val](kafkaTopic, s"$str") + // logger.debug(s"$kafkaTopic, $str") + ExceptionHandler.enqueue(KafkaMessage(keyedMessage)) + }) + Future.successful( + Ok("publish success.\n").withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10") + ) + // try { + // + // } catch { + // case e: Exception => Future.successful(BadRequest(e.getMessage)) + // } + } + + def publish(topic: String) = publishOnly(topic) + + // def mutateBulk(topic: String) = Action.async(parse.text) { request => + // EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, Config.KAFKA_FAIL_TOPIC, request.body).map { result => + // result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10") + // } + // } + def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request => + EdgeController.mutateAndPublish(request.body) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala new file mode 100644 index 0000000..e509554 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala @@ -0,0 +1,52 @@ +package org.apache.s2graph.rest.play.controllers + +import org.apache.s2graph.core.JSONParser +import org.apache.s2graph.core.mysqls.Experiment +import org.apache.s2graph.core.rest.RestHandler +import play.api.libs.json.Json +import play.api.mvc._ + +import scala.language.postfixOps + +object QueryController extends Controller with JSONParser { + + import ApplicationController._ + import play.api.libs.concurrent.Execution.Implicits.defaultContext + + private val rest: RestHandler = org.apache.s2graph.rest.play.Global.s2rest + + def delegate(request: Request[String]) = { + rest.doPost(request.uri, request.body, request.headers.get(Experiment.impressionKey)).body.map { + js => + jsonResponse(js, "result_size" -> rest.calcSize(js).toString) + } recoverWith ApplicationController.requestFallback(request.body) + } + + def getEdges() = withHeaderAsync(jsonText)(delegate) + + def getEdgesWithGrouping() = withHeaderAsync(jsonText)(delegate) + + def getEdgesExcluded() = withHeaderAsync(jsonText)(delegate) + + def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonText)(delegate) + + def checkEdges() = withHeaderAsync(jsonText)(delegate) + + def getEdgesGrouped() = withHeaderAsync(jsonText)(delegate) + + def getEdgesGroupedExcluded() = withHeaderAsync(jsonText)(delegate) + + def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonText)(delegate) + + def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) = + withHeaderAsync(jsonText) { + request => + val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) + rest.checkEdges(params).body.map { + js => + jsonResponse(js, "result_size" -> rest.calcSize(js).toString) + } recoverWith ApplicationController.requestFallback(request.body) + } + + def getVertices() = withHeaderAsync(jsonText)(delegate) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala new file mode 100644 index 0000000..b20fa37 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala @@ -0,0 +1,85 @@ +package org.apache.s2graph.rest.play.controllers + +import org.apache.s2graph.core.rest.RequestParser +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{ExceptionHandler, Graph, GraphExceptions} +import org.apache.s2graph.rest.play.actors.QueueActor +import org.apache.s2graph.rest.play.config.Config +import play.api.libs.json.{JsValue, Json} +import play.api.mvc.{Controller, Result} + +import scala.concurrent.Future + +object VertexController extends Controller { + private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph + private val requestParser: RequestParser = org.apache.s2graph.rest.play.Global.s2parser + + import ApplicationController._ + import ExceptionHandler._ + import play.api.libs.concurrent.Execution.Implicits._ + + def tryMutates(jsValue: JsValue, operation: String, serviceNameOpt: Option[String] = None, columnNameOpt: Option[String] = None, withWait: Boolean = false): Future[Result] = { + if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) + else { + try { + val vertices = requestParser.toVertices(jsValue, operation, serviceNameOpt, columnNameOpt) + + for (vertex <- vertices) { + if (vertex.isAsync) + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, vertex, None)) + else + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, vertex, None)) + } + + //FIXME: + val verticesToStore = vertices.filterNot(v => v.isAsync) + + if (withWait) { + val rets = s2.mutateVertices(verticesToStore, withWait = true) + rets.map(Json.toJson(_)).map(jsonResponse(_)) + } else { + val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true } + Future.successful(jsonResponse(Json.toJson(rets))) + } + } catch { + case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"e")) + case e: Exception => + logger.error(s"[Failed] tryMutates", e) + Future.successful(InternalServerError(s"${e.getStackTrace}")) + } + } + } + + def inserts() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insert") + } + + def insertsWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insert", withWait = true) + } + + def insertsSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insert", Some(serviceName), Some(columnName)) + } + + def deletes() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "delete") + } + + def deletesWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "delete", withWait = true) + } + + def deletesSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "delete", Some(serviceName), Some(columnName)) + } + + def deletesAll() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "deleteAll") + } + + def deletesAllSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "deleteAll", Some(serviceName), Some(columnName)) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/models/ExactCounterItem.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/models/ExactCounterItem.scala b/s2rest_play/app/org/apache/s2graph/rest/play/models/ExactCounterItem.scala new file mode 100644 index 0000000..df10aab --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/models/ExactCounterItem.scala @@ -0,0 +1,38 @@ +package org.apache.s2graph.rest.play.models + +import play.api.libs.json.{Json, Writes} + +/** + * Created by alec on 15. 4. 15.. + */ +case class ExactCounterItem(ts: Long, count: Long, score: Double) + +case class ExactCounterIntervalItem(interval: String, dimension: Map[String, String], counter: Seq[ExactCounterItem]) + +case class ExactCounterResultMeta(service: String, action: String, item: String) + +case class ExactCounterResult(meta: ExactCounterResultMeta, data: Seq[ExactCounterIntervalItem]) + +object ExactCounterItem { + implicit val writes = new Writes[ExactCounterItem] { + def writes(item: ExactCounterItem) = Json.obj( + "ts" -> item.ts, + "time" -> tsFormat.format(item.ts), + "count" -> item.count, + "score" -> item.score + ) + } + implicit val reads = Json.reads[ExactCounterItem] +} + +object ExactCounterIntervalItem { + implicit val format = Json.format[ExactCounterIntervalItem] +} + +object ExactCounterResultMeta { + implicit val format = Json.format[ExactCounterResultMeta] +} + +object ExactCounterResult { + implicit val formats = Json.format[ExactCounterResult] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/models/RankCounterItem.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/models/RankCounterItem.scala b/s2rest_play/app/org/apache/s2graph/rest/play/models/RankCounterItem.scala new file mode 100644 index 0000000..3d9ef41 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/models/RankCounterItem.scala @@ -0,0 +1,40 @@ +package org.apache.s2graph.rest.play.models + +import play.api.libs.json.{Json, Writes} + +/** + * Created by alec on 15. 4. 15.. + */ +case class RankCounterItem(rank: Int, id: String, score: Double) + +case class RankCounterDimensionItem(interval: String, ts: Long, dimension: String, total: Double, ranks: Seq[RankCounterItem]) + +case class RankCounterResultMeta(service: String, action: String) + +case class RankCounterResult(meta: RankCounterResultMeta, data: Seq[RankCounterDimensionItem]) + +object RankCounterItem { + implicit val format = Json.format[RankCounterItem] +} + +object RankCounterDimensionItem { + implicit val writes = new Writes[RankCounterDimensionItem] { + def writes(item: RankCounterDimensionItem) = Json.obj( + "interval" -> item.interval, + "ts" -> item.ts, + "time" -> tsFormat.format(item.ts), + "dimension" -> item.dimension, + "total" -> item.total, + "ranks" -> item.ranks + ) + } + implicit val reads = Json.reads[RankCounterDimensionItem] +} + +object RankCounterResultMeta { + implicit val format = Json.format[RankCounterResultMeta] +} + +object RankCounterResult { + implicit val format = Json.format[RankCounterResult] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/models/package.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/models/package.scala b/s2rest_play/app/org/apache/s2graph/rest/play/models/package.scala new file mode 100644 index 0000000..08b0d45 --- /dev/null +++ b/s2rest_play/app/org/apache/s2graph/rest/play/models/package.scala @@ -0,0 +1,10 @@ +package org.apache.s2graph.rest.play + +import java.text.SimpleDateFormat + +/** + * Created by alec on 15. 4. 20.. + */ +package object models { + def tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/util/TestDataLoader.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/util/TestDataLoader.scala b/s2rest_play/app/util/TestDataLoader.scala deleted file mode 100644 index 45a9b61..0000000 --- a/s2rest_play/app/util/TestDataLoader.scala +++ /dev/null @@ -1,70 +0,0 @@ -package util - -import java.io.File - -import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer} -import scala.io.Source -import scala.util.Random - - -object TestDataLoader { - val step = 100 - val prob = 1.0 - val (testIds, testIdsHist, testIdsHistCnt) = loadSeeds("./talk_vertices.txt") - val maxId = testIds.length - // val randoms = (0 until 100).map{ i => new SecureRandom } - // val idx = new AtomicInteger(0) - // def randomId() = { - // val r = randoms(idx.getAndIncrement() % randoms.size) - // testAccountIds(r.nextInt(maxId)) - // } - def randomId(histStep: Int) = { - for { - maxId <- testIdsHistCnt.get(histStep) - rIdx = Random.nextInt(maxId.toInt) - hist <- testIdsHist.get(histStep) - id = hist(rIdx) - } yield { -// logger.debug(s"randomId: $histStep = $id[$rIdx / $maxId]") - id - } - } - def randomId() = { - val id = testIds(Random.nextInt(maxId)) - // logger.debug(s"$id") - id - } - private def loadSeeds(filePath: String) = { - val histogram = new HashMap[Long, ListBuffer[Long]] - val histogramCnt = new HashMap[Long, Long] - val ids = new ArrayBuffer[Long] - - var idx = 0 -// logger.debug(s"$filePath start to load file.") - for (line <- Source.fromFile(new File(filePath)).getLines) { - // testAccountIds(idx) = line.toLong -// if (idx % 10000 == 0) logger.debug(s"$idx") - idx += 1 - - val parts = line.split("\\t") - val id = parts.head.toLong - val count = parts.last.toLong / step - if (count > 1 && Random.nextDouble < prob) { - histogram.get(count) match { - case None => - histogram.put(count, new ListBuffer[Long]) - histogram.get(count).get += id - histogramCnt.put(count, 1) - case Some(existed) => - existed += id - histogramCnt.put(count, histogramCnt.getOrElse(count, 0L) + 1L) - } - ids += id - } - - } -// logger.debug(s"upload $filePath finished.") -// logger.debug(s"${histogram.size}") - (ids, histogram.map(t => (t._1 -> t._2.toArray[Long])), histogramCnt) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/conf/reference.conf ---------------------------------------------------------------------- diff --git a/s2rest_play/conf/reference.conf b/s2rest_play/conf/reference.conf index 6e76847..452c013 100644 --- a/s2rest_play/conf/reference.conf +++ b/s2rest_play/conf/reference.conf @@ -112,7 +112,7 @@ lock.expire.time=600000 # max allowd edges for deleteAll is multiply of above two configuration. # set global obejct package, TODO: remove global -application.global=com.kakao.s2graph.rest.Global +application.global=org.apache.s2graph.rest.play.Global akka { loggers = ["akka.event.slf4j.Slf4jLogger"] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/conf/routes ---------------------------------------------------------------------- diff --git a/s2rest_play/conf/routes b/s2rest_play/conf/routes index 90838c8..f0115c9 100644 --- a/s2rest_play/conf/routes +++ b/s2rest_play/conf/routes @@ -4,123 +4,104 @@ # publish -#POST /publish/:topic controllers.PublishController.publish(topic) -POST /publish/:topic controllers.PublishController.mutateBulk(topic) -POST /publishOnly/:topic controllers.PublishController.publishOnly(topic) +POST /publish/:topic org.apache.s2graph.rest.play.controllers.PublishController.mutateBulk(topic) +POST /publishOnly/:topic org.apache.s2graph.rest.play.controllers.PublishController.publishOnly(topic) #### Health Check -#GET /health_check.html controllers.Assets.at(path="/public", file="health_check.html") -GET /health_check.html controllers.ApplicationController.healthCheck() -PUT /health_check/:isHealthy controllers.ApplicationController.updateHealthCheck(isHealthy: Boolean) +GET /health_check.html org.apache.s2graph.rest.play.controllers.ApplicationController.healthCheck() +PUT /health_check/:isHealthy org.apache.s2graph.rest.play.controllers.ApplicationController.updateHealthCheck(isHealthy: Boolean) ## Edge -POST /graphs/edges/insert controllers.EdgeController.inserts() -POST /graphs/edges/insertWithWait controllers.EdgeController.insertsWithWait() -POST /graphs/edges/insertBulk controllers.EdgeController.insertsBulk() -POST /graphs/edges/delete controllers.EdgeController.deletes() -POST /graphs/edges/deleteWithWait controllers.EdgeController.deletesWithWait() -POST /graphs/edges/deleteAll controllers.EdgeController.deleteAll() -POST /graphs/edges/update controllers.EdgeController.updates() -POST /graphs/edges/updateWithWait controllers.EdgeController.updatesWithWait() -POST /graphs/edges/increment controllers.EdgeController.increments() -POST /graphs/edges/incrementWithWait controllers.EdgeController.incrementsWithWait() -POST /graphs/edges/incrementCount controllers.EdgeController.incrementCounts() -POST /graphs/edges/bulk controllers.EdgeController.mutateBulk() -POST /graphs/edges/bulkWithWait controllers.EdgeController.mutateBulkWithWait() +POST /graphs/edges/insert org.apache.s2graph.rest.play.controllers.EdgeController.inserts() +POST /graphs/edges/insertWithWait org.apache.s2graph.rest.play.controllers.EdgeController.insertsWithWait() +POST /graphs/edges/insertBulk org.apache.s2graph.rest.play.controllers.EdgeController.insertsBulk() +POST /graphs/edges/delete org.apache.s2graph.rest.play.controllers.EdgeController.deletes() +POST /graphs/edges/deleteWithWait org.apache.s2graph.rest.play.controllers.EdgeController.deletesWithWait() +POST /graphs/edges/deleteAll org.apache.s2graph.rest.play.controllers.EdgeController.deleteAll() +POST /graphs/edges/update org.apache.s2graph.rest.play.controllers.EdgeController.updates() +POST /graphs/edges/updateWithWait org.apache.s2graph.rest.play.controllers.EdgeController.updatesWithWait() +POST /graphs/edges/increment org.apache.s2graph.rest.play.controllers.EdgeController.increments() +POST /graphs/edges/incrementWithWait org.apache.s2graph.rest.play.controllers.EdgeController.incrementsWithWait() +POST /graphs/edges/incrementCount org.apache.s2graph.rest.play.controllers.EdgeController.incrementCounts() +POST /graphs/edges/bulk org.apache.s2graph.rest.play.controllers.EdgeController.mutateBulk() +POST /graphs/edges/bulkWithWait org.apache.s2graph.rest.play.controllers.EdgeController.mutateBulkWithWait() ## Vertex -POST /graphs/vertices/insert controllers.VertexController.inserts() -POST /graphs/vertices/insertWithWait controllers.VertexController.insertsWithWait() -POST /graphs/vertices/insert/:serviceName/:columnName controllers.VertexController.insertsSimple(serviceName, columnName) -POST /graphs/vertices/delete controllers.VertexController.deletes() -POST /graphs/vertices/deleteWithWait controllers.VertexController.deletesWithWait() -POST /graphs/vertices/delete/:serviceName/:columnName controllers.VertexController.deletesSimple(serviceName, columnName) -POST /graphs/vertices/deleteAll controllers.VertexController.deletesAll() -POST /graphs/vertices/deleteAll/:serviceName/:columnName controllers.VertexController.deletesAllSimple(serviceName, columnName) +POST /graphs/vertices/insert org.apache.s2graph.rest.play.controllers.VertexController.inserts() +POST /graphs/vertices/insertWithWait org.apache.s2graph.rest.play.controllers.VertexController.insertsWithWait() +POST /graphs/vertices/insert/:serviceName/:columnName org.apache.s2graph.rest.play.controllers.VertexController.insertsSimple(serviceName, columnName) +POST /graphs/vertices/delete org.apache.s2graph.rest.play.controllers.VertexController.deletes() +POST /graphs/vertices/deleteWithWait org.apache.s2graph.rest.play.controllers.VertexController.deletesWithWait() +POST /graphs/vertices/delete/:serviceName/:columnName org.apache.s2graph.rest.play.controllers.VertexController.deletesSimple(serviceName, columnName) +POST /graphs/vertices/deleteAll org.apache.s2graph.rest.play.controllers.VertexController.deletesAll() +POST /graphs/vertices/deleteAll/:serviceName/:columnName org.apache.s2graph.rest.play.controllers.VertexController.deletesAllSimple(serviceName, columnName) ### SELECT Edges -POST /graphs/getEdges controllers.QueryController.getEdges() -POST /graphs/getEdges/grouped controllers.QueryController.getEdgesWithGrouping() -POST /graphs/getEdgesExcluded controllers.QueryController.getEdgesExcluded() -POST /graphs/getEdgesExcluded/grouped controllers.QueryController.getEdgesExcludedWithGrouping() -POST /graphs/checkEdges controllers.QueryController.checkEdges() +POST /graphs/getEdges org.apache.s2graph.rest.play.controllers.QueryController.getEdges() +POST /graphs/getEdges/grouped org.apache.s2graph.rest.play.controllers.QueryController.getEdgesWithGrouping() +POST /graphs/getEdgesExcluded org.apache.s2graph.rest.play.controllers.QueryController.getEdgesExcluded() +POST /graphs/getEdgesExcluded/grouped org.apache.s2graph.rest.play.controllers.QueryController.getEdgesExcludedWithGrouping() +POST /graphs/checkEdges org.apache.s2graph.rest.play.controllers.QueryController.checkEdges() ### this will be deprecated -POST /graphs/getEdgesGrouped controllers.QueryController.getEdgesGrouped() -POST /graphs/getEdgesGroupedExcluded controllers.QueryController.getEdgesGroupedExcluded() -POST /graphs/getEdgesGroupedExcludedFormatted controllers.QueryController.getEdgesGroupedExcludedFormatted() -GET /graphs/getEdge/:srcId/:tgtId/:labelName/:direction controllers.QueryController.getEdge(srcId, tgtId, labelName, direction) +POST /graphs/getEdgesGrouped org.apache.s2graph.rest.play.controllers.QueryController.getEdgesGrouped() +POST /graphs/getEdgesGroupedExcluded org.apache.s2graph.rest.play.controllers.QueryController.getEdgesGroupedExcluded() +POST /graphs/getEdgesGroupedExcludedFormatted org.apache.s2graph.rest.play.controllers.QueryController.getEdgesGroupedExcludedFormatted() +GET /graphs/getEdge/:srcId/:tgtId/:labelName/:direction org.apache.s2graph.rest.play.controllers.QueryController.getEdge(srcId, tgtId, labelName, direction) ### SELECT Vertices -#POST /graphs/getVertex controllers.QueryController.getVertex() -POST /graphs/getVertices controllers.QueryController.getVertices() +POST /graphs/getVertices org.apache.s2graph.rest.play.controllers.QueryController.getVertices() #### ADMIN -POST /graphs/createService controllers.AdminController.createService() -GET /graphs/getService/:serviceName controllers.AdminController.getService(serviceName) -GET /graphs/getLabels/:serviceName controllers.AdminController.getLabels(serviceName) -POST /graphs/createLabel controllers.AdminController.createLabel() -POST /graphs/addIndex controllers.AdminController.addIndex() -GET /graphs/getLabel/:labelName controllers.AdminController.getLabel(labelName) -PUT /graphs/deleteLabel/:labelName controllers.AdminController.deleteLabel(labelName) +POST /graphs/createService org.apache.s2graph.rest.play.controllers.AdminController.createService() +GET /graphs/getService/:serviceName org.apache.s2graph.rest.play.controllers.AdminController.getService(serviceName) +GET /graphs/getLabels/:serviceName org.apache.s2graph.rest.play.controllers.AdminController.getLabels(serviceName) +POST /graphs/createLabel org.apache.s2graph.rest.play.controllers.AdminController.createLabel() +POST /graphs/addIndex org.apache.s2graph.rest.play.controllers.AdminController.addIndex() +GET /graphs/getLabel/:labelName org.apache.s2graph.rest.play.controllers.AdminController.getLabel(labelName) +PUT /graphs/deleteLabel/:labelName org.apache.s2graph.rest.play.controllers.AdminController.deleteLabel(labelName) + +POST /graphs/addProp/:labelName org.apache.s2graph.rest.play.controllers.AdminController.addProp(labelName) +POST /graphs/createServiceColumn org.apache.s2graph.rest.play.controllers.AdminController.createServiceColumn() +PUT /graphs/deleteServiceColumn/:serviceName/:columnName org.apache.s2graph.rest.play.controllers.AdminController.deleteServiceColumn(serviceName, columnName) +POST /graphs/addServiceColumnProp/:serviceName/:columnName org.apache.s2graph.rest.play.controllers.AdminController.addServiceColumnProp(serviceName, columnName) +POST /graphs/addServiceColumnProps/:serviceName/:columnName org.apache.s2graph.rest.play.controllers.AdminController.addServiceColumnProps(serviceName, columnName) +GET /graphs/getServiceColumn/:serviceName/:columnName org.apache.s2graph.rest.play.controllers.AdminController.getServiceColumn(serviceName, columnName) +POST /graphs/createHTable org.apache.s2graph.rest.play.controllers.AdminController.createHTable() -POST /graphs/addProp/:labelName controllers.AdminController.addProp(labelName) -POST /graphs/createServiceColumn controllers.AdminController.createServiceColumn() -PUT /graphs/deleteServiceColumn/:serviceName/:columnName controllers.AdminController.deleteServiceColumn(serviceName, columnName) -POST /graphs/addServiceColumnProp/:serviceName/:columnName controllers.AdminController.addServiceColumnProp(serviceName, columnName) -POST /graphs/addServiceColumnProps/:serviceName/:columnName controllers.AdminController.addServiceColumnProps(serviceName, columnName) -GET /graphs/getServiceColumn/:serviceName/:columnName controllers.AdminController.getServiceColumn(serviceName, columnName) -POST /graphs/createHTable controllers.AdminController.createHTable() +# AdminController API +GET /admin/labels/:serviceName org.apache.s2graph.rest.play.controllers.AdminController.getLabels(serviceName) +POST /graphs/copyLabel/:oldLabelName/:newLabelName org.apache.s2graph.rest.play.controllers.AdminController.copyLabel(oldLabelName, newLabelName) +POST /graphs/renameLabel/:oldLabelName/:newLabelName org.apache.s2graph.rest.play.controllers.AdminController.renameLabel(oldLabelName, newLabelName) +POST /graphs/updateHTable/:labelName/:newHTableName org.apache.s2graph.rest.play.controllers.AdminController.updateHTable(labelName, newHTableName) +PUT /graphs/loadCache org.apache.s2graph.rest.play.controllers.AdminController.loadCache() + + +# Counter Admin API +POST /counter/v1/:service/:action org.apache.s2graph.rest.play.controllers.CounterController.createAction(service, action) +GET /counter/v1/:service/:action org.apache.s2graph.rest.play.controllers.CounterController.getAction(service, action) +PUT /counter/v1/:service/:action org.apache.s2graph.rest.play.controllers.CounterController.updateAction(service, action) +PUT /counter/v1/:service/:action/prepare org.apache.s2graph.rest.play.controllers.CounterController.prepareAction(service, action) +DELETE /counter/v1/:service/:action org.apache.s2graph.rest.play.controllers.CounterController.deleteAction(service, action) +# Counter API +GET /counter/v1/:service/:action/ranking org.apache.s2graph.rest.play.controllers.CounterController.getRankingCountAsync(service, action) +DELETE /counter/v1/:service/:action/ranking org.apache.s2graph.rest.play.controllers.CounterController.deleteRankingCount(service, action) +GET /counter/v1/:service/:action/:item org.apache.s2graph.rest.play.controllers.CounterController.getExactCountAsync(service, action, item) +PUT /counter/v1/:service/:action/:item org.apache.s2graph.rest.play.controllers.CounterController.incrementCount(service, action, item) +POST /counter/v1/mget org.apache.s2graph.rest.play.controllers.CounterController.getExactCountAsyncMulti() +# Experiment API +POST /graphs/experiment/:accessToken/:experimentName/:uuid org.apache.s2graph.rest.play.controllers.ExperimentController.experiment(accessToken, experimentName, uuid) -#### TEST -#GET /graphs/testGetEdges/:label/:limit/:friendCntStep controllers.QueryController.testGetEdges(label, limit: Int, friendCntStep: Int) -#GET /graphs/testGetEdges2/:label1/:limit1/:label2/:limit2 controllers.QueryController.testGetEdges2(label1, limit1: Int, label2, limit2: Int) -#GET /graphs/testGetEdges3/:label1/:limit1/:label2/:limit2/:label3/:limit3 controllers.QueryController.testGetEdges3(label1, limit1: Int, label2, limit2: Int, label3, limit3: Int) -POST /ping controllers.TestController.ping() -POST /pingAsync controllers.TestController.pingAsync() -GET /graphs/testId controllers.TestController.getRandomId() # Map static resources from the /public folder to the /assets URL path GET /images/*file controllers.Assets.at(path="/public/images", file) GET /javascripts/*file controllers.Assets.at(path="/public/javascripts", file) GET /stylesheets/*file controllers.Assets.at(path="/public/stylesheets", file) GET /font-awesome-4.1.0/*file controllers.Assets.at(path="/public/font-awesome-4.1.0", file) -GET /swagger/*file controllers.Assets.at(path="/public/swagger-ui", file) - - -# AdminController API -#GET /admin/services controllers.AdminController.allServices -GET /admin/labels/:serviceName controllers.AdminController.getLabels(serviceName) -#POST /admin/labels/delete/:zkAddr/:tableName/:labelIds/:minTs/:maxTs controllers.AdminController.deleteEdges(zkAddr, tableName, labelIds, minTs: Long, maxTs: Long) -#POST /admin/labels/deleteAll/:zkAddr/:tableName/:minTs/:maxTs controllers.AdminController.deleteAllEdges(zkAddr, tableName, minTs: Long, maxTs: Long) -#POST /admin/swapLabel/:oldLabelName/:newLabelName controllers.AdminController.swapLabel(oldLabelName, newLabelName) -#GET /admin/reloadLabel/:labelName controllers.AdminController.reloadLabel(labelName) -#POST /admin/getEdges controllers.AdminController.getEdges() -POST /graphs/copyLabel/:oldLabelName/:newLabelName controllers.AdminController.copyLabel(oldLabelName, newLabelName) -POST /graphs/renameLabel/:oldLabelName/:newLabelName controllers.AdminController.renameLabel(oldLabelName, newLabelName) -POST /graphs/updateHTable/:labelName/:newHTableName controllers.AdminController.updateHTable(labelName, newHTableName) -PUT /graphs/loadCache controllers.AdminController.loadCache() - - -# Counter Admin API -POST /counter/v1/:service/:action controllers.CounterController.createAction(service, action) -GET /counter/v1/:service/:action controllers.CounterController.getAction(service, action) -PUT /counter/v1/:service/:action controllers.CounterController.updateAction(service, action) -PUT /counter/v1/:service/:action/prepare controllers.CounterController.prepareAction(service, action) -DELETE /counter/v1/:service/:action controllers.CounterController.deleteAction(service, action) - -# Counter API -GET /counter/v1/:service/:action/ranking controllers.CounterController.getRankingCountAsync(service, action) -DELETE /counter/v1/:service/:action/ranking controllers.CounterController.deleteRankingCount(service, action) -GET /counter/v1/:service/:action/:item controllers.CounterController.getExactCountAsync(service, action, item) -PUT /counter/v1/:service/:action/:item controllers.CounterController.incrementCount(service, action, item) -POST /counter/v1/mget controllers.CounterController.getExactCountAsyncMulti() - -# Experiment API -POST /graphs/experiment/:accessToken/:experimentName/:uuid controllers.ExperimentController.experiment(accessToken, experimentName, uuid) +GET /swagger/*file controllers.Assets.at(path="/public/swagger-ui", file) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/benchmark/BenchmarkCommon.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/benchmark/BenchmarkCommon.scala b/s2rest_play/test/benchmark/BenchmarkCommon.scala deleted file mode 100644 index 48f84c8..0000000 --- a/s2rest_play/test/benchmark/BenchmarkCommon.scala +++ /dev/null @@ -1,24 +0,0 @@ -package benchmark - -import org.specs2.mutable.Specification - -trait BenchmarkCommon extends Specification { - val wrapStr = s"\n==================================================" - - def duration[T](prefix: String = "")(block: => T) = { - val startTs = System.currentTimeMillis() - val ret = block - val endTs = System.currentTimeMillis() - println(s"$wrapStr\n$prefix: took ${endTs - startTs} ms$wrapStr") - ret - } - - def durationWithReturn[T](prefix: String = "")(block: => T): (T, Long) = { - val startTs = System.currentTimeMillis() - val ret = block - val endTs = System.currentTimeMillis() - val duration = endTs - startTs -// println(s"$wrapStr\n$prefix: took $duration ms$wrapStr") - (ret, duration) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/benchmark/GraphUtilSpec.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/test/benchmark/GraphUtilSpec.scala b/s2rest_play/test/benchmark/GraphUtilSpec.scala deleted file mode 100644 index b5ce93a..0000000 --- a/s2rest_play/test/benchmark/GraphUtilSpec.scala +++ /dev/null @@ -1,125 +0,0 @@ -package benchmark - -import com.kakao.s2graph.core.{Management, GraphUtil} -import com.kakao.s2graph.core.types.{SourceVertexId, HBaseType, InnerVal, VertexId} -import org.apache.hadoop.hbase.util.Bytes -import play.api.test.{FakeApplication, PlaySpecification} - -import scala.collection.mutable -import scala.collection.mutable.ListBuffer -import scala.util.Random - -class GraphUtilSpec extends BenchmarkCommon with PlaySpecification { - - def between(bytes: Array[Byte], startKey: Array[Byte], endKey: Array[Byte]): Boolean = - Bytes.compareTo(startKey, bytes) <= 0 && Bytes.compareTo(endKey, bytes) >= 0 - - def betweenShort(value: Short, start: Short, end: Short): Boolean = - start <= value && value <= end - - - "GraphUtil" should { - "test murmur3 hash function distribution" in { - val testNum = 1000000 - val bucketSize = Short.MaxValue / 40 - val countsNew = new mutable.HashMap[Int, Int]() - val counts = new mutable.HashMap[Int, Int]() - for { - i <- (0 until testNum) - } { - val h = GraphUtil.murmur3(i.toString) / bucketSize - val hNew = GraphUtil.murmur3Int(i.toString) / bucketSize - counts += (h -> (counts.getOrElse(h, 0) + 1)) - countsNew += (hNew -> (countsNew.getOrElse(hNew, 0) + 1)) - } - val all = counts.toList.sortBy { case (bucket, count) => count }.reverse - val allNew = countsNew.toList.sortBy { case (bucket, count) => count }.reverse - val top = all.take(10) - val bottom = all.takeRight(10) - val topNew = allNew.take(10) - val bottomNew = allNew.takeRight(10) - println(s"Top: $top") - println(s"Bottom: $bottom") - println("-" * 50) - println(s"TopNew: $topNew") - println(s"Bottom: $bottomNew") - true - } - - "test murmur hash skew2" in { - running(FakeApplication()) { - import HBaseType._ - val testNum = 1000000L - val regionCount = 40 - val window = Int.MaxValue / regionCount - val rangeBytes = new ListBuffer[(List[Byte], List[Byte])]() - for { - i <- (0 until regionCount) - } yield { - val startKey = Bytes.toBytes(i * window) - val endKey = Bytes.toBytes((i + 1) * window) - rangeBytes += (startKey.toList -> endKey.toList) - } - - - - val stats = new collection.mutable.HashMap[Int, ((List[Byte], List[Byte]), Long)]() - val counts = new collection.mutable.HashMap[Short, Long]() - stats += (0 -> (rangeBytes.head -> 0L)) - - for (i <- (0L until testNum)) { - val vertexId = SourceVertexId(DEFAULT_COL_ID, InnerVal.withLong(i, HBaseType.DEFAULT_VERSION)) - val bytes = vertexId.bytes - val shortKey = GraphUtil.murmur3(vertexId.innerId.toIdString()) - val shortVal = counts.getOrElse(shortKey, 0L) + 1L - counts += (shortKey -> shortVal) - var j = 0 - var found = false - while (j < rangeBytes.size && !found) { - val (start, end) = rangeBytes(j) - if (between(bytes, start.toArray, end.toArray)) { - found = true - } - j += 1 - } - val head = rangeBytes(j - 1) - val key = j - 1 - val value = stats.get(key) match { - case None => 0L - case Some(v) => v._2 + 1 - } - stats += (key -> (head, value)) - } - val sorted = stats.toList.sortBy(kv => kv._2._2).reverse - println(s"Index: StartBytes ~ EndBytes\tStartShortBytes ~ EndShortBytes\tStartShort ~ EndShort\tCount\tShortCount") - sorted.foreach { case (idx, ((start, end), cnt)) => - val startShort = Bytes.toShort(start.take(2).toArray) - val endShort = Bytes.toShort(end.take(2).toArray) - val count = counts.count(t => startShort <= t._1 && t._1 < endShort) - println(s"$idx: $start ~ $end\t${start.take(2)} ~ ${end.take(2)}\t$startShort ~ $endShort\t$cnt\t$count") - - } - println("\n" * 10) - println(s"Index: StartBytes ~ EndBytes\tStartShortBytes ~ EndShortBytes\tStartShort ~ EndShort\tCount\tShortCount") - stats.toList.sortBy(kv => kv._1).reverse.foreach { case (idx, ((start, end), cnt)) => - val startShort = Bytes.toShort(start.take(2).toArray) - val endShort = Bytes.toShort(end.take(2).toArray) - val count = counts.count(t => startShort <= t._1 && t._1 < endShort) - println(s"$idx: $start ~ $end\t${start.take(2)} ~ ${end.take(2)}\t$startShort ~ $endShort\t$cnt\t$count") - - } - } - true - } - - "Bytes compareTo" in { - val x = Array[Byte](11, -12, -26, -14, -23) - val startKey = Array[Byte](0, 0, 0, 0) - val endKey = Array[Byte](12, -52, -52, -52) - println(Bytes.compareTo(startKey, x)) - println(Bytes.compareTo(endKey, x)) - true - } - } - -}
