http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/AdminController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/AdminController.scala b/s2rest_play/app/controllers/AdminController.scala new file mode 100644 index 0000000..2eb248b --- /dev/null +++ b/s2rest_play/app/controllers/AdminController.scala @@ -0,0 +1,419 @@ +package controllers + +import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.mysqls._ +import com.kakao.s2graph.core.rest.RequestParser +import com.kakao.s2graph.core.utils.logger +import play.api.mvc +import play.api.mvc.{Action, Controller} +import play.api.libs.json._ +import play.api.libs.functional.syntax._ + +import scala.util.{Failure, Success, Try} + +object AdminController extends Controller { + + import ApplicationController._ + private val requestParser: RequestParser = com.kakao.s2graph.rest.Global.s2parser + + /** + * admin message formatter + * @tparam T + */ + trait AdminMessageFormatter[T] { + def toJson(msg: T): JsValue + } + + object AdminMessageFormatter { + implicit def jsValueToJson[T <: JsValue] = new AdminMessageFormatter[T] { + def toJson(js: T) = js + } + + implicit val stringToJson = new AdminMessageFormatter[String] { + def toJson(js: String) = Json.obj("message" -> js) + } + } + + def format[T: AdminMessageFormatter](f: JsValue => play.mvc.Result)(message: T) = { + val formatter = implicitly[AdminMessageFormatter[T]] + f(formatter.toJson(message)) + } + + /** + * ok response + * @param message + * @tparam T + * @return + */ + def ok[T: AdminMessageFormatter](message: T) = { + val formatter = implicitly[AdminMessageFormatter[T]] + Ok(formatter.toJson(message)).as(applicationJsonHeader) + } + + /** + * bad request response + * @param message + * @tparam T + * @return + */ + def bad[T: AdminMessageFormatter](message: T) = { + val formatter = implicitly[AdminMessageFormatter[T]] + BadRequest(formatter.toJson(message)).as(applicationJsonHeader) + } + + /** + * not found response + * @param message + * @tparam T + * @return + */ + def notFound[T: AdminMessageFormatter](message: T) = { + val formatter = implicitly[AdminMessageFormatter[T]] + NotFound(formatter.toJson(message)).as(applicationJsonHeader) + } + + private[AdminController] def tryResponse[T, R: AdminMessageFormatter](res: Try[T])(callback: T => R): mvc.Result = res match { + case Success(m) => + val ret = callback(m) + logger.info(ret.toString) + ok(ret) + case Failure(error) => + logger.error(error.getMessage, error) + error match { + case JsResultException(e) => bad(JsError.toFlatJson(e)) + case _ => bad(error.getMessage) + } + } + + def optionResponse[T, R: AdminMessageFormatter](res: Option[T])(callback: T => R): mvc.Result = res match { + case Some(m) => ok(callback(m)) + case None => notFound("not found") + } + + /** + * load all model cache + * @return + */ + def loadCache() = Action { request => + val startTs = System.currentTimeMillis() + + if (!ApplicationController.isHealthy) { + loadCacheInner() + } + + ok(s"${System.currentTimeMillis() - startTs}") + } + + def loadCacheInner() = { + Service.findAll() + ServiceColumn.findAll() + Label.findAll() + LabelMeta.findAll() + LabelIndex.findAll() + ColumnMeta.findAll() + } + + /** + * read + */ + + /** + * get service info + * @param serviceName + * @return + */ + def getService(serviceName: String) = Action { request => + val serviceOpt = Management.findService(serviceName) + optionResponse(serviceOpt)(_.toJson) + } + + /** + * get label info + * @param labelName + * @return + */ + def getLabel(labelName: String) = Action { request => + val labelOpt = Management.findLabel(labelName) + optionResponse(labelOpt)(_.toJson) + } + + /** + * get all labels of service + * @param serviceName + * @return + */ + def getLabels(serviceName: String) = Action { request => + Service.findByName(serviceName) match { + case None => notFound(s"Service $serviceName not found") + case Some(service) => + val src = Label.findBySrcServiceId(service.id.get) + val tgt = Label.findByTgtServiceId(service.id.get) + + ok(Json.obj("from" -> src.map(_.toJson), "to" -> tgt.map(_.toJson))) + } + } + + /** + * get service columns + * @param serviceName + * @param columnName + * @return + */ + def getServiceColumn(serviceName: String, columnName: String) = Action { request => + val serviceColumnOpt = for { + service <- Service.findByName(serviceName) + serviceColumn <- ServiceColumn.find(service.id.get, columnName, useCache = false) + } yield serviceColumn + + optionResponse(serviceColumnOpt)(_.toJson) + } + + /** + * create + */ + + /** + * create service + * @return + */ + def createService() = Action(parse.json) { request => + val serviceTry = createServiceInner(request.body) + tryResponse(serviceTry)(_.toJson) + } + + def createServiceInner(jsValue: JsValue) = { + val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = requestParser.toServiceElements(jsValue) + Management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) + } + + /** + * create label + * @return + */ + def createLabel() = Action(parse.json) { request => + val ret = createLabelInner(request.body) + tryResponse(ret)(_.toJson) + } + + def createLabelInner(json: JsValue) = for { + labelArgs <- requestParser.toLabelElements(json) + label <- (Management.createLabel _).tupled(labelArgs) + } yield label + + /** + * add index + * @return + */ + def addIndex() = Action(parse.json) { request => + val ret = addIndexInner(request.body) + tryResponse(ret)(_.label + " is updated") + } + + def addIndexInner(json: JsValue) = for { + (labelName, indices) <- requestParser.toIndexElements(json) + label <- Management.addIndex(labelName, indices) + } yield label + + /** + * create service column + * @return + */ + def createServiceColumn() = Action(parse.json) { request => + val serviceColumnTry = createServiceColumnInner(request.body) + tryResponse(serviceColumnTry) { (columns: Seq[ColumnMeta]) => Json.obj("metas" -> columns.map(_.toJson)) } + } + + def createServiceColumnInner(jsValue: JsValue) = for { + (serviceName, columnName, columnType, props) <- requestParser.toServiceColumnElements(jsValue) + serviceColumn <- Management.createServiceColumn(serviceName, columnName, columnType, props) + } yield serviceColumn + + /** + * delete + */ + + /** + * delete label + * @param labelName + * @return + */ + def deleteLabel(labelName: String) = Action { request => + val deleteLabelTry = deleteLabelInner(labelName) + tryResponse(deleteLabelTry)(labelName => labelName + " is deleted") + } + + def deleteLabelInner(labelName: String) = Management.deleteLabel(labelName) + + /** + * delete servieColumn + * @param serviceName + * @param columnName + * @return + */ + def deleteServiceColumn(serviceName: String, columnName: String) = Action { request => + val serviceColumnTry = deleteServiceColumnInner(serviceName, columnName) + tryResponse(serviceColumnTry)(columnName => columnName + " is deleted") + } + + def deleteServiceColumnInner(serviceName: String, columnName: String) = + Management.deleteColumn(serviceName, columnName) + + /** + * update + */ + + /** + * add Prop to label + * @param labelName + * @return + */ + def addProp(labelName: String) = Action(parse.json) { request => + val labelMetaTry = addPropInner(labelName, request.body) + tryResponse(labelMetaTry)(_.toJson) + } + + def addPropInner(labelName: String, js: JsValue) = for { + prop <- requestParser.toPropElements(js) + labelMeta <- Management.addProp(labelName, prop) + } yield labelMeta + + /** + * add prop to serviceColumn + * @param serviceName + * @param columnName + * @return + */ + def addServiceColumnProp(serviceName: String, columnName: String) = Action(parse.json) { request => + addServiceColumnPropInner(serviceName, columnName)(request.body) match { + case None => bad(s"can`t find service with $serviceName or can`t find serviceColumn with $columnName") + case Some(m) => Ok(m.toJson).as(applicationJsonHeader) + } + } + + def addServiceColumnPropInner(serviceName: String, columnName: String)(js: JsValue) = { + for { + service <- Service.findByName(serviceName) + serviceColumn <- ServiceColumn.find(service.id.get, columnName) + prop <- requestParser.toPropElements(js).toOption + } yield { + ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, prop.defaultValue) + } + } + + /** + * add props to serviceColumn + * @param serviecName + * @param columnName + * @return + */ + def addServiceColumnProps(serviecName: String, columnName: String) = Action(parse.json) { request => + val jsObjs = request.body.asOpt[List[JsObject]].getOrElse(List.empty[JsObject]) + val newProps = for { + js <- jsObjs + newProp <- addServiceColumnPropInner(serviecName, columnName)(js) + } yield newProp + ok(s"${newProps.size} is added.") + } + + /** + * copy label + * @param oldLabelName + * @param newLabelName + * @return + */ + def copyLabel(oldLabelName: String, newLabelName: String) = Action { request => + val copyTry = Management.copyLabel(oldLabelName, newLabelName, Some(newLabelName)) + tryResponse(copyTry)(_.label + "created") + } + + /** + * rename label + * @param oldLabelName + * @param newLabelName + * @return + */ + def renameLabel(oldLabelName: String, newLabelName: String) = Action { request => + Label.findByName(oldLabelName) match { + case None => NotFound.as(applicationJsonHeader) + case Some(label) => + Management.updateLabelName(oldLabelName, newLabelName) + ok(s"Label was updated") + } + } + + /** + * update HTable for a label + * @param labelName + * @param newHTableName + * @return + */ + def updateHTable(labelName: String, newHTableName: String) = Action { request => + val updateTry = Management.updateHTable(labelName, newHTableName) + tryResponse(updateTry)(_.toString + " label(s) updated.") + } + + + case class HTableParams(cluster: String, hTableName: String, + preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) { + + override def toString(): String = { + s"""HtableParams + |-- cluster : $cluster + |-- hTableName : $hTableName + |-- preSplitSize : $preSplitSize + |-- hTableTTL : $hTableTTL + |-- compressionAlgorithm : $compressionAlgorithm + |""".stripMargin + } + } + + implicit object HTableParamsJsonConverter extends Format[HTableParams] { + def reads(json: JsValue): JsResult[HTableParams] = ( + (__ \ "cluster").read[String] and + (__ \ "hTableName").read[String] and + (__ \ "preSplitSize").read[Int] and + (__ \ "hTableTTL").readNullable[Int] and + (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json) + + def writes(o: HTableParams): JsValue = Json.obj( + "cluster" -> o.cluster, + "hTableName" -> o.hTableName, + "preSplitSize" -> o.preSplitSize, + "hTableTTL" -> o.hTableTTL, + "compressionAlgorithm" -> o.compressionAlgorithm + ) + } + + implicit object JsErrorJsonWriter extends Writes[JsError] { + def writes(o: JsError): JsValue = Json.obj( + "errors" -> JsArray( + o.errors.map { + case (path, validationErrors) => Json.obj( + "path" -> Json.toJson(path.toString()), + "validationErrors" -> JsArray(validationErrors.map(validationError => Json.obj( + "message" -> JsString(validationError.message), + "args" -> JsArray(validationError.args.map(_ match { + case x: Int => JsNumber(x) + case x => JsString(x.toString) + })) + ))) + ) + } + ) + ) + } + + def createHTable() = Action { request => + + // Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm) + request.body.asJson.map(_.validate[HTableParams] match { + case JsSuccess(hTableParams, _) => { + Management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), hTableParams.preSplitSize, hTableParams.hTableTTL, hTableParams.compressionAlgorithm.getOrElse(Management.defaultCompressionAlgorithm)) + logger.info(hTableParams.toString()) + ok(s"HTable was created.") + } + case err@JsError(_) => bad(Json.toJson(err)) + }).getOrElse(bad("Invalid Json.")) + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/ApplicationController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/ApplicationController.scala b/s2rest_play/app/controllers/ApplicationController.scala new file mode 100644 index 0000000..fefe93e --- /dev/null +++ b/s2rest_play/app/controllers/ApplicationController.scala @@ -0,0 +1,83 @@ +package controllers + +import com.kakao.s2graph.core.utils.logger +import play.api.libs.iteratee.Enumerator +import play.api.libs.json.{JsString, JsValue} +import play.api.mvc._ + +import scala.concurrent.{ExecutionContext, Future} + +object ApplicationController extends Controller { + + var isHealthy = true + var deployInfo = "" + val applicationJsonHeader = "application/json" + + val jsonParser: BodyParser[JsValue] = controllers.s2parse.json + + def updateHealthCheck(isHealthy: Boolean) = Action { request => + this.isHealthy = isHealthy + Ok(this.isHealthy + "\n") + } + + def healthCheck() = withHeader(parse.anyContent) { request => + if (isHealthy) Ok(deployInfo) + else NotFound + } + + def jsonResponse(json: JsValue, headers: (String, String)*) = + if (ApplicationController.isHealthy) { + Ok(json).as(applicationJsonHeader).withHeaders(headers: _*) + } else { + Result( + header = ResponseHeader(OK), + body = Enumerator(json.toString.getBytes()), + connection = HttpConnection.Close + ).as(applicationJsonHeader).withHeaders((CONNECTION -> "close") +: headers: _*) + } + + def toLogMessage[A](request: Request[A], result: Result)(startedAt: Long): String = { + val duration = System.currentTimeMillis() - startedAt + val isQueryRequest = result.header.headers.contains("result_size") + val resultSize = result.header.headers.getOrElse("result_size", "-1") + + try { + val body = request.body match { + case AnyContentAsJson(jsValue) => jsValue match { + case JsString(str) => str + case _ => jsValue.toString + } + case _ => request.body.toString + } + + val str = + if (isQueryRequest) + s"${request.method} ${request.uri} took ${duration} ms ${result.header.status} ${resultSize} ${body}" + else + s"${request.method} ${request.uri} took ${duration} ms ${result.header.status} ${resultSize} ${body}" + + logger.info(s"${request.method} ${request.uri} result_size: $resultSize") + + str + } finally { + /* pass */ + } + } + + def withHeaderAsync[A](bodyParser: BodyParser[A])(block: Request[A] => Future[Result])(implicit ex: ExecutionContext) = + Action.async(bodyParser) { request => + val startedAt = System.currentTimeMillis() + block(request).map { r => + logger.info(toLogMessage(request, r)(startedAt)) + r + } + } + + def withHeader[A](bodyParser: BodyParser[A])(block: Request[A] => Result) = + Action(bodyParser) { request: Request[A] => + val startedAt = System.currentTimeMillis() + val r = block(request) + logger.info(toLogMessage(request, r)(startedAt)) + r + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/CounterController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/CounterController.scala b/s2rest_play/app/controllers/CounterController.scala new file mode 100644 index 0000000..c2b4dc2 --- /dev/null +++ b/s2rest_play/app/controllers/CounterController.scala @@ -0,0 +1,747 @@ +package controllers + +import com.kakao.s2graph.core.ExceptionHandler +import com.kakao.s2graph.core.ExceptionHandler.KafkaMessage +import com.kakao.s2graph.core.mysqls.Label +import config.CounterConfig +import models._ +import org.apache.kafka.clients.producer.ProducerRecord +import play.api.Play +import play.api.libs.json.Reads._ +import play.api.libs.json._ +import play.api.mvc.{Action, Controller, Request} +import s2.config.S2CounterConfig +import s2.counter.core.TimedQualifier.IntervalUnit +import s2.counter.core._ +import s2.counter.core.v1.{ExactStorageAsyncHBase, RankingStorageRedis} +import s2.counter.core.v2.{ExactStorageGraph, RankingStorageGraph} +import s2.models.Counter.ItemType +import s2.models.{Counter, CounterModel} +import s2.util.{CartesianProduct, ReduceMapValue, UnitConverter} + +import scala.concurrent.Future +import scala.util.{Failure, Success, Try} + +/** + * Created by hsleep([email protected]) on 15. 5. 22.. + */ +object CounterController extends Controller { + import play.api.libs.concurrent.Execution.Implicits.defaultContext + + val config = Play.current.configuration.underlying + val s2config = new S2CounterConfig(config) + + private val exactCounterMap = Map( + s2.counter.VERSION_1 -> new ExactCounter(config, new ExactStorageAsyncHBase(config)), + s2.counter.VERSION_2 -> new ExactCounter(config, new ExactStorageGraph(config)) + ) + private val rankingCounterMap = Map( + s2.counter.VERSION_1 -> new RankingCounter(config, new RankingStorageRedis(config)), + s2.counter.VERSION_2 -> new RankingCounter(config, new RankingStorageGraph(config)) + ) + + private val tablePrefixMap = Map ( + s2.counter.VERSION_1 -> "s2counter", + s2.counter.VERSION_2 -> "s2counter_v2" + ) + + private def exactCounter(version: Byte): ExactCounter = exactCounterMap(version) + private def rankingCounter(version: Byte): RankingCounter = rankingCounterMap(version) + + lazy val counterModel = new CounterModel(config) + + def getQueryString[T](key: String, default: String)(implicit request: Request[T]): String = { + request.getQueryString(key).getOrElse(default) + } + + implicit val counterWrites = new Writes[Counter] { + override def writes(o: Counter): JsValue = Json.obj( + "version" -> o.version.toInt, + "autoComb" -> o.autoComb, + "dimension" -> o.dimension, + "useProfile" -> o.useProfile, + "bucketImpId" -> o.bucketImpId, + "useRank" -> o.useRank, + "intervalUnit" -> o.intervalUnit, + "ttl" -> o.ttl, + "dailyTtl" -> o.dailyTtl, + "rateAction" -> o.rateActionId.flatMap { actionId => + counterModel.findById(actionId, useCache = false).map { actionPolicy => + Json.obj("service" -> actionPolicy.service, "action" -> actionPolicy.action) + } + }, + "rateBase" -> o.rateBaseId.flatMap { baseId => + counterModel.findById(baseId, useCache = false).map { basePolicy => + Json.obj("service" -> basePolicy.service, "action" -> basePolicy.action) + } + }, + "rateThreshold" -> o.rateThreshold + ) + } + + def createAction(service: String, action: String) = Action(s2parse.json) { implicit request => + counterModel.findByServiceAction(service, action, useCache = false) match { + case None => + val body = request.body + val version = (body \ "version").asOpt[Int].map(_.toByte).getOrElse(s2.counter.VERSION_2) + val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(true) + val dimension = (body \ "dimension").asOpt[String].getOrElse("") + val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(false) + val bucketImpId = (body \ "bucketImpId").asOpt[String] + + val useExact = (body \ "useExact").asOpt[Boolean].getOrElse(true) + val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(true) + + val intervalUnit = (body \ "intervalUnit").asOpt[String] + // 2 day + val ttl = (body \ "ttl").asOpt[Int].getOrElse(2 * 24 * 60 * 60) + val dailyTtl = (body \ "dailyTtl").asOpt[Int] + val regionMultiplier = (body \ "regionMultiplier").asOpt[Int].getOrElse(1) + + val rateAction = (body \ "rateAction").asOpt[Map[String, String]] + val rateBase = (body \ "rateBase").asOpt[Map[String, String]] + val rateThreshold = (body \ "rateThreshold").asOpt[Int] + + val rateActionId = { + for { + actionMap <- rateAction + service <- actionMap.get("service") + action <- actionMap.get("action") + policy <- counterModel.findByServiceAction(service, action) + } yield { + policy.id + } + } + val rateBaseId = { + for { + actionMap <- rateBase + service <- actionMap.get("service") + action <- actionMap.get("action") + policy <- counterModel.findByServiceAction(service, action) + } yield { + policy.id + } + } + + val hbaseTable = { + Seq(tablePrefixMap(version), service, ttl) ++ dailyTtl mkString "_" + } + + // find label + val itemType = Label.findByName(action, useCache = false) match { + case Some(label) => + ItemType.withName(label.tgtColumnType.toUpperCase) + case None => + val strItemType = (body \ "itemType").asOpt[String].getOrElse("STRING") + ItemType.withName(strItemType.toUpperCase) + } + val policy = Counter(useFlag = true, version, service, action, itemType, autoComb = autoComb, dimension, + useProfile = useProfile, bucketImpId, useRank = useRank, ttl, dailyTtl, Some(hbaseTable), intervalUnit, + rateActionId, rateBaseId, rateThreshold) + + // prepare exact storage + exactCounter(version).prepare(policy) + if (useRank) { + // prepare ranking storage + rankingCounter(version).prepare(policy) + } + counterModel.createServiceAction(policy) + Ok(Json.toJson(Map("msg" -> s"created $service/$action"))) + case Some(policy) => + Ok(Json.toJson(Map("msg" -> s"already exist $service/$action"))) + } + } + + def getAction(service: String, action: String) = Action { request => + counterModel.findByServiceAction(service, action, useCache = false) match { + case Some(policy) => + Ok(Json.toJson(policy)) + case None => + NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))) + } + } + + def updateAction(service: String, action: String) = Action(s2parse.json) { request => + counterModel.findByServiceAction(service, action, useCache = false) match { + case Some(oldPolicy) => + val body = request.body + val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(oldPolicy.autoComb) + val dimension = (body \ "dimension").asOpt[String].getOrElse(oldPolicy.dimension) + val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(oldPolicy.useProfile) + val bucketImpId = (body \ "bucketImpId").asOpt[String] match { + case Some(s) => Some(s) + case None => oldPolicy.bucketImpId + } + + val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(oldPolicy.useRank) + + val intervalUnit = (body \ "intervalUnit").asOpt[String] match { + case Some(s) => Some(s) + case None => oldPolicy.intervalUnit + } + + val rateAction = (body \ "rateAction").asOpt[Map[String, String]] + val rateBase = (body \ "rateBase").asOpt[Map[String, String]] + val rateThreshold = (body \ "rateThreshold").asOpt[Int] match { + case Some(i) => Some(i) + case None => oldPolicy.rateThreshold + } + + val rateActionId = { + for { + actionMap <- rateAction + service <- actionMap.get("service") + action <- actionMap.get("action") + policy <- counterModel.findByServiceAction(service, action, useCache = false) + } yield { + policy.id + } + } match { + case Some(i) => Some(i) + case None => oldPolicy.rateActionId + } + val rateBaseId = { + for { + actionMap <- rateBase + service <- actionMap.get("service") + action <- actionMap.get("action") + policy <- counterModel.findByServiceAction(service, action, useCache = false) + } yield { + policy.id + } + } match { + case Some(i) => Some(i) + case None => oldPolicy.rateBaseId + } + + // new counter + val policy = Counter(id = oldPolicy.id, useFlag = oldPolicy.useFlag, oldPolicy.version, service, action, oldPolicy.itemType, autoComb = autoComb, dimension, + useProfile = useProfile, bucketImpId, useRank = useRank, oldPolicy.ttl, oldPolicy.dailyTtl, oldPolicy.hbaseTable, intervalUnit, + rateActionId, rateBaseId, rateThreshold) + + counterModel.updateServiceAction(policy) + Ok(Json.toJson(Map("msg" -> s"updated $service/$action"))) + case None => + NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))) + } + } + + def prepareAction(service: String, action: String) = Action(s2parse.json) { request => + // for data migration + counterModel.findByServiceAction(service, action, useCache = false) match { + case Some(policy) => + val body = request.body + val version = (body \ "version").as[Int].toByte + if (version != policy.version) { + // change table name + val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) ++ policy.dailyTtl mkString "_" + val newPolicy = policy.copy(version = version, hbaseTable = Some(newTableName)) + exactCounter(version).prepare(newPolicy) + if (newPolicy.useRank) { + rankingCounter(version).prepare(newPolicy) + } + Ok(Json.toJson(Map("msg" -> s"prepare storage v$version $service/$action"))) + } else { + Ok(Json.toJson(Map("msg" -> s"already prepared storage v$version $service/$action"))) + } + case None => + NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))) + } + } + + def deleteAction(service: String, action: String) = Action.apply { + { + for { + policy <- counterModel.findByServiceAction(service, action, useCache = false) + } yield { + Try { + exactCounter(policy.version).destroy(policy) + if (policy.useRank) { + rankingCounter(policy.version).destroy(policy) + } + counterModel.deleteServiceAction(policy) + } match { + case Success(v) => + Ok(Json.toJson(Map("msg" -> s"deleted $service/$action"))) + case Failure(ex) => + throw ex + } + } + }.getOrElse(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))) + } + + def getExactCountAsync(service: String, action: String, item: String) = Action.async { implicit request => + val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq + .map(IntervalUnit.withName) + val limit = getQueryString("limit", "1").toInt + + val qsSum = request.getQueryString("sum") + + val optFrom = request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis) + val optTo = request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis) + + val limitOpt = (optFrom, optTo) match { + case (Some(_), Some(_)) => + None + case _ => + Some(limit) + } + + // find dimension + lazy val dimQueryValues = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) => + (k.substring(1), v.mkString(",").split(',').filter(_.nonEmpty).toSet) + } +// Logger.warn(s"$dimQueryValues") + + counterModel.findByServiceAction(service, action) match { + case Some(policy) => + val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optFrom, optTo) + val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo) + try { +// Logger.warn(s"$tqs $qsSum") + if (tqs.head.length > 1 && qsSum.isDefined) { + getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum).map { jsVal => + Ok(jsVal) + } + } else { + getExactCountToJs(policy, item, timeRange, limitOpt, dimQueryValues).map { jsVal => + Ok(jsVal) + } + } + } catch { + case e: Exception => + throw e +// Future.successful(BadRequest(s"$service, $action, $item")) + } + case None => + Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))) + } + } + + /** + * [{ + * "service": , "action", "itemIds": [], "interval": string, "limit": int, "from": ts, "to": ts, + * "dimensions": [{"key": list[String]}] + * }] + * @return + */ + private def parseExactCountParam(jsValue: JsValue) = { + val service = (jsValue \ "service").as[String] + val action = (jsValue \ "action").as[String] + val itemIds = (jsValue \ "itemIds").as[Seq[String]] + val intervals = (jsValue \ "intervals").asOpt[Seq[String]].getOrElse(Seq("t")).distinct.map(IntervalUnit.withName) + val limit = (jsValue \ "limit").asOpt[Int].getOrElse(1) + val from = (jsValue \ "from").asOpt[Long] + val to = (jsValue \ "to").asOpt[Long] + val sum = (jsValue \ "sum").asOpt[String] + val dimensions = { + for { + dimension <- (jsValue \ "dimensions").asOpt[Seq[JsObject]].getOrElse(Nil) + (k, vs) <- dimension.fields + } yield { + k -> vs.as[Seq[String]].toSet + } + }.toMap + (service, action, itemIds, intervals, limit, from, to, dimensions, sum) + } + + def getExactCountAsyncMulti = Action.async(s2parse.json) { implicit request => + val jsValue = request.body + try { + val futures = { + for { + jsObject <- jsValue.asOpt[List[JsObject]].getOrElse(Nil) + (service, action, itemIds, intervalUnits, limit, from, to, dimQueryValues, qsSum) = parseExactCountParam(jsObject) + optFrom = from.map(UnitConverter.toMillis) + optTo = to.map(UnitConverter.toMillis) + timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo) + policy <- counterModel.findByServiceAction(service, action).toSeq + item <- itemIds + } yield { + val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optFrom, optTo) + val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo) + val limitOpt = (optFrom, optTo) match { + case (Some(_), Some(_)) => + None + case _ => + Some(limit) + } + +// Logger.warn(s"$item, $limit, $optFrom, $optTo, $qsSum, $tqs") + + if (tqs.head.length > 1 && qsSum.isDefined) { + getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum) + } else { + getExactCountToJs(policy, item, timeRange, limitOpt, dimQueryValues) + } + } + } + Future.sequence(futures).map { rets => + Ok(Json.toJson(rets)) + } + } catch { + case e: Exception => + throw e +// Future.successful(BadRequest(s"$jsValue")) + } + } + + private [controllers] def fetchedToResult(fetchedCounts: FetchedCountsGrouped, limitOpt: Option[Int]): Seq[ExactCounterIntervalItem] = { + for { + ((interval, dimKeyValues), values) <- fetchedCounts.intervalWithCountMap + } yield { + val counterItems = { + val sortedItems = values.toSeq.sortBy { case (eq, v) => -eq.tq.ts } + val limited = limitOpt match { + case Some(limit) => sortedItems.take(limit) + case None => sortedItems + } + for { + (eq, value) <- limited + } yield { + ExactCounterItem(eq.tq.ts, value, value.toDouble) + } + } + ExactCounterIntervalItem(interval.toString, dimKeyValues, counterItems) + } + }.toSeq + + private def decayedToResult(decayedCounts: DecayedCounts): Seq[ExactCounterIntervalItem] = { + for { + (eq, score) <- decayedCounts.qualifierWithCountMap + } yield { + ExactCounterIntervalItem(eq.tq.q.toString, eq.dimKeyValues, Seq(ExactCounterItem(eq.tq.ts, score.toLong, score))) + } + }.toSeq + + private def getExactCountToJs(policy: Counter, + item: String, + timeRange: Seq[(TimedQualifier, TimedQualifier)], + limitOpt: Option[Int], + dimQueryValues: Map[String, Set[String]]): Future[JsValue] = { + exactCounter(policy.version).getCountsAsync(policy, Seq(item), timeRange, dimQueryValues).map { seq => + val items = { + for { + fetched <- seq + } yield { + fetchedToResult(fetched, limitOpt) + } + }.flatten + Json.toJson(ExactCounterResult(ExactCounterResultMeta(policy.service, policy.action, item), items)) + } + } + + private def getDecayedCountToJs(policy: Counter, + item: String, + timeRange: Seq[(TimedQualifier, TimedQualifier)], + dimQueryValues: Map[String, Set[String]], + qsSum: Option[String]): Future[JsValue] = { + exactCounter(policy.version).getDecayedCountsAsync(policy, Seq(item), timeRange, dimQueryValues, qsSum).map { seq => + val decayedCounts = seq.head + val meta = ExactCounterResultMeta(policy.service, policy.action, decayedCounts.exactKey.itemKey) + val intervalItems = decayedToResult(decayedCounts) + Json.toJson(ExactCounterResult(meta, intervalItems)) + } + } + + def getRankingCountAsync(service: String, action: String) = Action.async { implicit request => + lazy val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq + .map(IntervalUnit.withName) + lazy val limit = getQueryString("limit", "1").toInt + lazy val kValue = getQueryString("k", "10").toInt + + lazy val qsSum = request.getQueryString("sum") + + lazy val optFrom = request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis) + lazy val optTo = request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis) + + // find dimension + lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) => + (k.substring(1), v.mkString(",").split(',').toList) + } + + val dimensions = { + for { + values <- CartesianProduct(dimensionMap.values.toList).toSeq + } yield { + dimensionMap.keys.zip(values).toMap + } + } + + counterModel.findByServiceAction(service, action) match { + case Some(policy) => + val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optTo) + val dimKeys = { + for { + dimension <- dimensions + } yield { + dimension -> tqs.map(tq => RankingKey(policy.id, policy.version, ExactQualifier(tq, dimension))) + } + } + + // if tqs has only 1 tq, do not apply sum function + try { + val rankResult = { + if (tqs.length > 1 && qsSum.isDefined) { + getSumRankCounterResultAsync(policy, dimKeys, kValue, qsSum) + } else { + // no summary + Future.successful(getRankCounterResult(policy, dimKeys, kValue)) + } + } + + rankResult.map { result => + Ok(Json.toJson(result)) + } + } catch { + case e: UnsupportedOperationException => + Future.successful(NotImplemented(Json.toJson( + Map("msg" -> e.getMessage) + ))) + case e: Throwable => + throw e + } + case None => + Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))) + } + } + + def deleteRankingCount(service: String, action: String) = Action.async { implicit request => + lazy val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq + .map(IntervalUnit.withName) + lazy val limit = getQueryString("limit", "1").toInt + + // find dimension + lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) => + (k.substring(1), v.mkString(",").split(',').toList) + } + + val dimensions = { + for { + values <- CartesianProduct(dimensionMap.values.toList).toSeq + } yield { + dimensionMap.keys.zip(values).toMap + } + } + + Future { + counterModel.findByServiceAction(service, action) match { + case Some(policy) => + val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit) + val keys = { + for { + dimension <- dimensions + tq <- tqs + } yield { + RankingKey(policy.id, policy.version, ExactQualifier(tq, dimension)) + } + } + + for { + key <- keys + } { + rankingCounter(policy.version).delete(key) + } + + Ok(JsObject( + Seq( + ("msg", Json.toJson(s"delete ranking in $service.$action")), + ("items", Json.toJson({ + for { + key <- keys + } yield { + s"${key.eq.tq.q}.${key.eq.tq.ts}.${key.eq.dimension}" + } + })) + ) + )) + case None => + NotFound(Json.toJson( + Map("msg" -> s"$service.$action not found") + )) + } + } + } + + val reduceRateRankingValue = new ReduceMapValue[ExactKeyTrait, RateRankingValue](RateRankingValue.reduce, RateRankingValue(-1, -1)) + + // change format + private def getDecayedCountsAsync(policy: Counter, + items: Seq[String], + timeRange: (TimedQualifier, TimedQualifier), + dimension: Map[String, String], + qsSum: Option[String]): Future[Seq[(ExactKeyTrait, Double)]] = { + exactCounter(policy.version).getDecayedCountsAsync(policy, items, Seq(timeRange), dimension.mapValues(s => Set(s)), qsSum).map { seq => + for { + DecayedCounts(exactKey, qcMap) <- seq + value <- qcMap.values + } yield { + exactKey -> value + } + } + } + + def getSumRankCounterResultAsync(policy: Counter, + dimKeys: Seq[(Map[String, String], Seq[RankingKey])], + kValue: Int, + qsSum: Option[String]): Future[RankCounterResult] = { + val futures = { + for { + (dimension, keys) <- dimKeys + } yield { + val tqs = keys.map(rk => rk.eq.tq) + val (tqFrom, tqTo) = (tqs.last, tqs.head) + val items = rankingCounter(policy.version).getAllItems(keys, kValue) +// Logger.warn(s"item count: ${items.length}") + val future = { + if (policy.isRateCounter) { + val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get + val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get + + val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq => + seq.map { case (k, score) => + ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1) + }.toMap + } + val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq => + seq.map { case (k, score) => + ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score) + }.toMap + } + futureAction.zip(futureBase).map { case (actionScores, baseScores) => + reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) => +// Logger.warn(s"$k -> $rrv") + k -> rrv.rankingValue.score + }.toSeq + } + } + else if (policy.isTrendCounter) { + val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get + val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get + + val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq => + seq.map { case (k, score) => + ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1) + }.toMap + } + val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom.add(-1), tqTo.add(-1)), dimension, qsSum).map { seq => + seq.map { case (k, score) => + ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score) + }.toMap + } + futureAction.zip(futureBase).map { case (actionScores, baseScores) => + reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) => +// Logger.warn(s"$k -> $rrv") + k -> rrv.rankingValue.score + }.toSeq + } + } + else { + getDecayedCountsAsync(policy, items, (tqFrom, tqTo), dimension, qsSum) + } + } + future.map { keyWithScore => + val ranking = keyWithScore.sortBy(-_._2).take(kValue) + val rankCounterItems = { + for { + idx <- ranking.indices + (exactKey, score) = ranking(idx) + } yield { + val realId = policy.itemType match { + case ItemType.BLOB => exactCounter(policy.version).getBlobValue(policy, exactKey.itemKey) + .getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} ${exactKey.itemKey}")) + case _ => exactKey.itemKey + } + RankCounterItem(idx + 1, realId, score) + } + } + + val eq = ExactQualifier(tqFrom, dimension) + RankCounterDimensionItem(eq.tq.q.toString, eq.tq.ts, eq.dimension, -1, rankCounterItems) + } + } + } + + Future.sequence(futures).map { dimensionResultList => + RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList) + } + } + + def getRankCounterResult(policy: Counter, dimKeys: Seq[(Map[String, String], Seq[RankingKey])], kValue: Int): RankCounterResult = { + val dimensionResultList = { + for { + (dimension, keys) <- dimKeys + key <- keys + } yield { + val rankingValue = rankingCounter(policy.version).getTopK(key, kValue) + val ranks = { + for { + rValue <- rankingValue.toSeq + idx <- rValue.values.indices + rank = idx + 1 + } yield { + val (id, score) = rValue.values(idx) + val realId = policy.itemType match { + case ItemType.BLOB => + exactCounter(policy.version) + .getBlobValue(policy, id) + .getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} $id")) + case _ => id + } + RankCounterItem(rank, realId, score) + } + } + val eq = key.eq + val tq = eq.tq + RankCounterDimensionItem(tq.q.toString, tq.ts, eq.dimension, rankingValue.map(v => v.totalScore).getOrElse(0d), ranks) + } + } + + RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList) + } + + type Record = ProducerRecord[String, String] + + def incrementCount(service: String, action: String, item: String) = Action.async(s2parse.json) { request => + Future { + /** + * { + * timestamp: Long + * property: {} + * value: Int + * } + */ + lazy val metaMap = Map("service" -> service, "action" -> action, "item" -> item) + counterModel.findByServiceAction(service, action).map { policy => + val body = request.body + try { + val ts = (body \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()).toString + val dimension = (body \ "dimension").asOpt[JsValue].getOrElse(Json.obj()) + val property = (body \ "property").asOpt[JsValue].getOrElse(Json.obj()) + + val msg = List(ts, service, action, item, dimension, property).mkString("\t") + + // produce to kafka + // hash partitioner by key + ExceptionHandler.enqueue(KafkaMessage(new Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg))) + + Ok(Json.toJson( + Map( + "meta" -> metaMap + ) + )) + } + catch { + case e: JsResultException => + BadRequest(Json.toJson( + Map("msg" -> s"need timestamp.") + )) + } + }.getOrElse { + NotFound(Json.toJson( + Map("msg" -> s"$service.$action not found") + )) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/EdgeController.scala b/s2rest_play/app/controllers/EdgeController.scala new file mode 100644 index 0000000..32af6c3 --- /dev/null +++ b/s2rest_play/app/controllers/EdgeController.scala @@ -0,0 +1,222 @@ +package controllers + +import actors.QueueActor +import com.kakao.s2graph.core.GraphExceptions.BadQueryException +import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.mysqls.{LabelMeta, Label} +import com.kakao.s2graph.core.rest.RequestParser +import com.kakao.s2graph.core.types.LabelWithDirection +import com.kakao.s2graph.core.utils.logger +import config.Config +import org.apache.kafka.clients.producer.ProducerRecord +import play.api.libs.json._ +import play.api.mvc.{Controller, Result} + +import scala.collection.Seq +import scala.concurrent.Future +import scala.util.{Failure, Success} + +object EdgeController extends Controller { + + import ExceptionHandler._ + import controllers.ApplicationController._ + import play.api.libs.concurrent.Execution.Implicits._ + + private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph + private val requestParser: RequestParser = com.kakao.s2graph.rest.Global.s2parser + + def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = { + if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) + + else { + try { + logger.debug(s"$jsValue") + val edges = requestParser.toEdges(jsValue, operation) + for (edge <- edges) { + if (edge.isAsync) + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, None)) + else + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, edge, None)) + } + + val edgesToStore = edges.filterNot(e => e.isAsync) + + if (withWait) { + val rets = s2.mutateEdges(edgesToStore, withWait = true) + rets.map(Json.toJson(_)).map(jsonResponse(_)) + } else { + val rets = edgesToStore.map { edge => QueueActor.router ! edge; true } + Future.successful(jsonResponse(Json.toJson(rets))) + } + } catch { + case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e")) + case e: Exception => + logger.error(s"mutateAndPublish: $e", e) + Future.successful(InternalServerError(s"${e.getStackTrace}")) + } + } + } + + def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] = { + if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) + + logger.debug(s"$str") + val edgeStrs = str.split("\\n") + + var vertexCnt = 0L + var edgeCnt = 0L + try { + val elements = + for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); element <- Graph.toGraphElement(str)) yield { + element match { + case v: Vertex => vertexCnt += 1 + case e: Edge => edgeCnt += 1 + } + if (element.isAsync) { + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, element, Some(str))) + } else { + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, element, Some(str))) + } + element + } + + //FIXME: + val elementsToStore = elements.filterNot(e => e.isAsync) + if (withWait) { + val rets = s2.mutateElements(elementsToStore, withWait) + rets.map(Json.toJson(_)).map(jsonResponse(_)) + } else { + val rets = elementsToStore.map { element => QueueActor.router ! element; true } + Future.successful(jsonResponse(Json.toJson(rets))) + } + + + } catch { + case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e")) + case e: Throwable => + logger.error(s"mutateAndPublish: $e", e) + Future.successful(InternalServerError(s"${e.getStackTrace}")) + } + } + + def mutateBulk() = withHeaderAsync(parse.text) { request => + mutateAndPublish(request.body) + } + + def inserts() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insert") + } + + def insertsWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insert", withWait = true) + } + + def insertsBulk() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insertBulk") + } + + def deletes() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "delete") + } + + def deletesWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "delete", withWait = true) + } + + def updates() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "update") + } + + def updatesWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "update", withWait = true) + } + + def increments() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "increment") + } + + def incrementCounts() = withHeaderAsync(jsonParser) { request => + val jsValue = request.body + val edges = requestParser.toEdges(jsValue, "incrementCount") + s2.incrementCounts(edges).map { results => + val json = results.map { case (isSuccess, resultCount) => + Json.obj("success" -> isSuccess, "result" -> resultCount) + } + + jsonResponse(Json.toJson(json)) + } + } + + def deleteAll() = withHeaderAsync(jsonParser) { request => + deleteAllInner(request.body, withWait = false) + } + + def deleteAllInner(jsValue: JsValue, withWait: Boolean) = { + val deleteResults = Future.sequence(jsValue.as[Seq[JsValue]] map { json => + + val labelName = (json \ "label").as[String] + val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil) + val direction = (json \ "direction").asOpt[String].getOrElse("out") + + val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil) + val ts = (json \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()) + val vertices = requestParser.toVertices(labelName, direction, ids) + + /** logging for delete all request */ + + val kafkaMessages = for { + id <- ids + label <- labels + } yield { + val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", direction).mkString("\t") + val topic = if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC + val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) + kafkaMsg + } + ExceptionHandler.enqueues(kafkaMessages) + + val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts) + future.onFailure { case ex: Exception => + logger.error(s"[Error]: deleteAllInner failed.", ex) + val kafkaMessages = for { + id <- ids + label <- labels + } yield { + val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", direction).mkString("\t") + val topic = failTopic + val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) + kafkaMsg + } + ExceptionHandler.enqueues(kafkaMessages) + throw ex + } + if (withWait) { + future.map { ret => + if (!ret) { + logger.error(s"[Error]: deleteAllInner failed.") + val kafkaMessages = for { + id <- ids + label <- labels + } yield { + val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", direction).mkString("\t") + val topic = failTopic + val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv)) + kafkaMsg + } + ExceptionHandler.enqueues(kafkaMessages) + false + } else { + true + } + } + } else { + Future.successful(true) + } + }) + + deleteResults.map { rst => + logger.debug(s"deleteAllInner: $rst") + Ok(s"deleted... ${rst.toString()}") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/ExperimentController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/ExperimentController.scala b/s2rest_play/app/controllers/ExperimentController.scala new file mode 100644 index 0000000..5ae2379 --- /dev/null +++ b/s2rest_play/app/controllers/ExperimentController.scala @@ -0,0 +1,114 @@ +package controllers + + +import java.net.URL + +import com.kakao.s2graph.core.mysqls._ +import com.kakao.s2graph.core.rest.RequestParser +import com.kakao.s2graph.core.utils.logger +import play.api.Play.current +import play.api.libs.json.{JsObject, JsString, JsValue, Json} +import play.api.libs.ws.WS +import play.api.mvc._ + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +object ExperimentController extends Controller { + val impressionKey = "S2-Impression-Id" + + import ApplicationController._ + + def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(parse.anyContent) { request => + val bucketOpt = for { + service <- Service.findByAccessToken(accessToken) + experiment <- Experiment.findBy(service.id.get, experimentName) + bucket <- experiment.findBucket(uuid) + } yield bucket + + bucketOpt match { + case None => Future.successful(NotFound("bucket is not found.")) + case Some(bucket) => + try { + if (bucket.isGraphQuery) buildRequestInner(request, bucket, uuid) + else buildRequest(request, bucket, uuid) + } catch { + case e: Exception => + logger.error(e.toString()) + Future.successful(BadRequest(s"wrong or missing template parameter: ${e.getMessage}")) + } + } + } + + def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): JsValue = { + var body = bucket.requestBody.replace("#uuid", uuid) + + for { + requestKeyJson <- requestKeyJsonOpt + jsObj <- requestKeyJson.asOpt[JsObject] + (key, value) <- jsObj.fieldSet + } { + val replacement = value match { + case JsString(s) => s + case _ => value.toString + } + body = body.replace(key, replacement) + } + + Json.parse(body) + } + + private def buildRequestInner(request: Request[AnyContent], bucket: Bucket, uuid: String): Future[Result] = { + if (bucket.isEmpty) Future.successful(Ok(Json.obj("isEmpty" -> true)).withHeaders(impressionKey -> bucket.impressionId)) + else { + val jsonBody = makeRequestJson(request.body.asJson, bucket, uuid) + val url = new URL(bucket.apiPath) + val path = url.getPath() + + // dummy log for sampling + val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody" + logger.info(experimentLog) + + val response = path match { + case "/graphs/getEdges" => controllers.QueryController.getEdgesInner(jsonBody) + case "/graphs/getEdges/grouped" => controllers.QueryController.getEdgesWithGroupingInner(jsonBody) + case "/graphs/getEdgesExcluded" => controllers.QueryController.getEdgesExcludedInner(jsonBody) + case "/graphs/getEdgesExcluded/grouped" => controllers.QueryController.getEdgesExcludedWithGroupingInner(jsonBody) + case "/graphs/checkEdges" => controllers.QueryController.checkEdgesInner(jsonBody) + case "/graphs/getEdgesGrouped" => controllers.QueryController.getEdgesGroupedInner(jsonBody) + case "/graphs/getEdgesGroupedExcluded" => controllers.QueryController.getEdgesGroupedExcludedInner(jsonBody) + case "/graphs/getEdgesGroupedExcludedFormatted" => controllers.QueryController.getEdgesGroupedExcludedFormattedInner(jsonBody) + } + response.map { r => r.withHeaders(impressionKey -> bucket.impressionId) } + } + } + + private def toSimpleMap(map: Map[String, Seq[String]]): Map[String, String] = { + for { + (k, vs) <- map + headVal <- vs.headOption + } yield { + k -> headVal + } + } + + private def buildRequest(request: Request[AnyContent], bucket: Bucket, uuid: String): Future[Result] = { + val jsonBody = makeRequestJson(request.body.asJson, bucket, uuid) + + val url = bucket.apiPath + val headers = request.headers.toSimpleMap.toSeq + val verb = bucket.httpVerb.toUpperCase + val qs = toSimpleMap(request.queryString).toSeq + + val ws = WS.url(url) + .withMethod(verb) + .withBody(jsonBody) + .withHeaders(headers: _*) + .withQueryString(qs: _*) + + ws.stream().map { + case (proxyResponse, proxyBody) => + Result(ResponseHeader(proxyResponse.status, proxyResponse.headers.mapValues(_.toList.head)), proxyBody).withHeaders(impressionKey -> bucket.impressionId) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/JsonBodyParser.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/JsonBodyParser.scala b/s2rest_play/app/controllers/JsonBodyParser.scala new file mode 100644 index 0000000..3e3d40c --- /dev/null +++ b/s2rest_play/app/controllers/JsonBodyParser.scala @@ -0,0 +1,73 @@ +package controllers + +import com.kakao.s2graph.core.utils.logger +import play.api.Play +import play.api.libs.iteratee.Iteratee +import play.api.libs.json.{JsValue, Json} +import play.api.mvc._ + +import scala.concurrent.Future +import scala.util.control.NonFatal + +/** + * Created by hsleep([email protected]) on 15. 9. 1.. + */ + +object s2parse extends BodyParsers { + + import parse._ + + val defaultMaxTextLength = 1024 * 512 + val defaultMaxJsonLength = 1024 * 512 + +// def json: BodyParser[JsValue] = json(DEFAULT_MAX_TEXT_LENGTH) + def json: BodyParser[JsValue] = json(defaultMaxTextLength) + + def json(maxLength: Int): BodyParser[JsValue] = when( + _.contentType.exists(m => m.equalsIgnoreCase("text/json") || m.equalsIgnoreCase("application/json")), + tolerantJson(maxLength), + createBadResult("Expecting text/json or application/json body") + ) + + def tolerantJson(maxLength: Int): BodyParser[JsValue] = + tolerantBodyParser[JsValue]("json", maxLength, "Invalid Json") { (request, bytes) => + // Encoding notes: RFC 4627 requires that JSON be encoded in Unicode, and states that whether that's + // UTF-8, UTF-16 or UTF-32 can be auto detected by reading the first two bytes. So we ignore the declared + // charset and don't decode, we passing the byte array as is because Jackson supports auto detection. + Json.parse(bytes) + } + + private def tolerantBodyParser[A](name: String, maxLength: Int, errorMessage: String)(parser: (RequestHeader, Array[Byte]) => A): BodyParser[A] = + BodyParser(name + ", maxLength=" + maxLength) { request => + import play.api.libs.iteratee.Execution.Implicits.trampoline + import play.api.libs.iteratee.Traversable + + import scala.util.control.Exception._ + + val bodyParser: Iteratee[Array[Byte], Either[Result, Either[Future[Result], A]]] = + Traversable.takeUpTo[Array[Byte]](maxLength).transform( + Iteratee.consume[Array[Byte]]().map { bytes => + allCatch[A].either { + parser(request, bytes) + }.left.map { + case NonFatal(e) => + val txt = new String(bytes) + logger.error(s"$errorMessage: $txt", e) + createBadResult(s"$errorMessage: $e")(request) + case t => throw t + } + } + ).flatMap(Iteratee.eofOrElse(Results.EntityTooLarge)) + + bodyParser.mapM { + case Left(tooLarge) => Future.successful(Left(tooLarge)) + case Right(Left(badResult)) => badResult.map(Left.apply) + case Right(Right(body)) => Future.successful(Right(body)) + } + } + + private def createBadResult(msg: String): RequestHeader => Future[Result] = { request => + Play.maybeApplication.map(_.global.onBadRequest(request, msg)) + .getOrElse(Future.successful(Results.BadRequest)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/PublishController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/PublishController.scala b/s2rest_play/app/controllers/PublishController.scala new file mode 100644 index 0000000..b1495d5 --- /dev/null +++ b/s2rest_play/app/controllers/PublishController.scala @@ -0,0 +1,54 @@ +package controllers + +import com.kakao.s2graph.core.ExceptionHandler +import config.Config +import org.apache.kafka.clients.producer.ProducerRecord +import play.api.mvc._ + +import scala.concurrent.Future + +object PublishController extends Controller { + + import ApplicationController._ + import ExceptionHandler._ + import play.api.libs.concurrent.Execution.Implicits._ + + /** + * never check validation on string. just redirect strings to kafka. + */ + val serviceNotExistException = new RuntimeException(s"service is not created in s2graph. create service first.") + + // private def toService(topic: String): String = { + // Service.findByName(topic).map(service => s"${service.serviceName}-${Config.PHASE}").getOrElse(throw serviceNotExistException) + // } + def publishOnly(topic: String) = withHeaderAsync(parse.text) { request => + if (!Config.IS_WRITE_SERVER) Future.successful(UNAUTHORIZED) + // val kafkaTopic = toService(topic) + val strs = request.body.split("\n") + strs.foreach(str => { + val keyedMessage = new ProducerRecord[Key, Val](Config.KAFKA_LOG_TOPIC, str) + // val keyedMessage = new ProducerRecord[Key, Val](kafkaTopic, s"$str") + // logger.debug(s"$kafkaTopic, $str") + ExceptionHandler.enqueue(KafkaMessage(keyedMessage)) + }) + Future.successful( + Ok("publish success.\n").withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10") + ) + // try { + // + // } catch { + // case e: Exception => Future.successful(BadRequest(e.getMessage)) + // } + } + + def publish(topic: String) = publishOnly(topic) + + // def mutateBulk(topic: String) = Action.async(parse.text) { request => + // EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, Config.KAFKA_FAIL_TOPIC, request.body).map { result => + // result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> "timeout=10, max=10") + // } + // } + def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request => + EdgeController.mutateAndPublish(request.body) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/QueryController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/QueryController.scala b/s2rest_play/app/controllers/QueryController.scala new file mode 100644 index 0000000..480c142 --- /dev/null +++ b/s2rest_play/app/controllers/QueryController.scala @@ -0,0 +1,311 @@ +package controllers + + +import com.kakao.s2graph.core.GraphExceptions.BadQueryException +import com.kakao.s2graph.core._ +import com.kakao.s2graph.core.mysqls._ +import com.kakao.s2graph.core.types.{LabelWithDirection, VertexId} +import com.kakao.s2graph.core.utils.logger +import config.Config +import play.api.libs.json.{JsArray, JsObject, JsValue, Json} +import play.api.mvc.{Action, Controller, Result} + +import scala.concurrent._ +import scala.language.postfixOps +import scala.util.Try + +object QueryController extends Controller { + + import ApplicationController._ + import play.api.libs.concurrent.Execution.Implicits.defaultContext + + private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph + private val requestParser = com.kakao.s2graph.rest.Global.s2parser + + private def badQueryExceptionResults(ex: Exception) = Future.successful(BadRequest(Json.obj("message" -> ex.getMessage)).as(applicationJsonHeader)) + + private def errorResults = Future.successful(Ok(PostProcess.timeoutResults).as(applicationJsonHeader)) + + def getEdges() = withHeaderAsync(jsonParser) { request => + getEdgesInner(request.body) + } + + def getEdgesExcluded = withHeaderAsync(jsonParser) { request => + getEdgesExcludedInner(request.body) + } + + private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = { + val filterOutQueryResultsLs = q.filterOutQuery match { + case Some(filterOutQuery) => s2.getEdges(filterOutQuery) + case None => Future.successful(Seq.empty) + } + + for { + queryResultsLs <- s2.getEdges(q) + filterOutResultsLs <- filterOutQueryResultsLs + } yield { + val json = post(queryResultsLs, filterOutResultsLs) + json + } + } + + private def calcSize(js: JsValue): Int = js match { + case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0) + case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum + case _ => 0 + } + + private def getEdgesAsync(jsonQuery: JsValue) + (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[Result] = { + if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader) + val fetch = eachQuery(post) _ +// logger.info(jsonQuery) + + Try { + val future = jsonQuery match { + case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray) + case obj@JsObject(_) => fetch(requestParser.toQuery(obj)) + case _ => throw BadQueryException("Cannot support") + } + + future map { json => jsonResponse(json, "result_size" -> calcSize(json).toString) } + + } recover { + case e: BadQueryException => + logger.error(s"$jsonQuery, $e", e) + badQueryExceptionResults(e) + case e: Exception => + logger.error(s"$jsonQuery, $e", e) + errorResults + } get + } + + @deprecated(message = "deprecated", since = "0.2") + private def getEdgesExcludedAsync(jsonQuery: JsValue) + (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[Result] = { + + if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader) + + Try { + val q = requestParser.toQuery(jsonQuery) + val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) + + val fetchFuture = s2.getEdges(q) + val excludeFuture = s2.getEdges(filterOutQuery) + + for { + queryResultLs <- fetchFuture + exclude <- excludeFuture + } yield { + val json = post(queryResultLs, exclude) + jsonResponse(json, "result_size" -> calcSize(json).toString) + } + } recover { + case e: BadQueryException => + logger.error(s"$jsonQuery, $e", e) + badQueryExceptionResults(e) + case e: Exception => + logger.error(s"$jsonQuery, $e", e) + errorResults + } get + } + + def getEdgesInner(jsonQuery: JsValue) = { + getEdgesAsync(jsonQuery)(PostProcess.toSimpleVertexArrJson) + } + + def getEdgesExcludedInner(jsValue: JsValue) = { + getEdgesExcludedAsync(jsValue)(PostProcess.toSimpleVertexArrJson) + } + + def getEdgesWithGrouping() = withHeaderAsync(jsonParser) { request => + getEdgesWithGroupingInner(request.body) + } + + def getEdgesWithGroupingInner(jsonQuery: JsValue) = { + getEdgesAsync(jsonQuery)(PostProcess.summarizeWithListFormatted) + } + + def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonParser) { request => + getEdgesExcludedWithGroupingInner(request.body) + } + + def getEdgesExcludedWithGroupingInner(jsonQuery: JsValue) = { + getEdgesExcludedAsync(jsonQuery)(PostProcess.summarizeWithListExcludeFormatted) + } + + def getEdgesGroupedInner(jsonQuery: JsValue) = { + getEdgesAsync(jsonQuery)(PostProcess.summarizeWithList) + } + + @deprecated(message = "deprecated", since = "0.2") + def getEdgesGrouped() = withHeaderAsync(jsonParser) { request => + getEdgesGroupedInner(request.body) + } + + @deprecated(message = "deprecated", since = "0.2") + def getEdgesGroupedExcluded() = withHeaderAsync(jsonParser) { request => + getEdgesGroupedExcludedInner(request.body) + } + + @deprecated(message = "deprecated", since = "0.2") + def getEdgesGroupedExcludedInner(jsonQuery: JsValue): Future[Result] = { + if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader) + + Try { + val q = requestParser.toQuery(jsonQuery) + val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) + + val fetchFuture = s2.getEdges(q) + val excludeFuture = s2.getEdges(filterOutQuery) + + for { + queryResultLs <- fetchFuture + exclude <- excludeFuture + } yield { + val json = PostProcess.summarizeWithListExclude(queryResultLs, exclude) + jsonResponse(json, "result_size" -> calcSize(json).toString) + } + } recover { + case e: BadQueryException => + logger.error(s"$jsonQuery, $e", e) + badQueryExceptionResults(e) + case e: Exception => + logger.error(s"$jsonQuery, $e", e) + errorResults + } get + } + + @deprecated(message = "deprecated", since = "0.2") + def getEdgesGroupedExcludedFormatted = withHeaderAsync(jsonParser) { request => + getEdgesGroupedExcludedFormattedInner(request.body) + } + + @deprecated(message = "deprecated", since = "0.2") + def getEdgesGroupedExcludedFormattedInner(jsonQuery: JsValue): Future[Result] = { + if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader) + + Try { + val q = requestParser.toQuery(jsonQuery) + val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) + + val fetchFuture = s2.getEdges(q) + val excludeFuture = s2.getEdges(filterOutQuery) + + for { + queryResultLs <- fetchFuture + exclude <- excludeFuture + } yield { + val json = PostProcess.summarizeWithListExcludeFormatted(queryResultLs, exclude) + jsonResponse(json, "result_size" -> calcSize(json).toString) + } + } recover { + case e: BadQueryException => + logger.error(s"$jsonQuery, $e", e) + badQueryExceptionResults(e) + case e: Exception => + logger.error(s"$jsonQuery, $e", e) + errorResults + } get + } + + def getEdge(srcId: String, tgtId: String, labelName: String, direction: String) = Action.async { request => + if (!Config.IS_QUERY_SERVER) Future.successful(Unauthorized) + val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) + checkEdgesInner(params) + } + + /** + * Vertex + */ + + def checkEdgesInner(jsValue: JsValue) = { + try { + val params = jsValue.as[List[JsValue]] + var isReverted = false + val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]() + val quads = for { + param <- params + labelName <- (param \ "label").asOpt[String] + direction <- GraphUtil.toDir((param \ "direction").asOpt[String].getOrElse("out")) + label <- Label.findByName(labelName) + srcId <- requestParser.jsValueToInnerVal((param \ "from").as[JsValue], label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion) + tgtId <- requestParser.jsValueToInnerVal((param \ "to").as[JsValue], label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion) + } yield { + val labelWithDir = LabelWithDirection(label.id.get, direction) + labelWithDirs += labelWithDir + val (src, tgt, dir) = if (direction == 1) { + isReverted = true + (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), + Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), 0) + } else { + (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), + Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), 0) + } + + // logger.debug(s"SrcVertex: $src") + // logger.debug(s"TgtVertex: $tgt") + // logger.debug(s"direction: $dir") + (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir))) + } + + s2.checkEdges(quads).map { case queryRequestWithResultLs => + val edgeJsons = for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + convertedEdge = if (isReverted) edge.duplicateEdge else edge + edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam) + } yield Json.toJson(edgeJson) + + val json = Json.toJson(edgeJsons) + jsonResponse(json, "result_size" -> edgeJsons.size.toString) + } + } catch { + case e: Exception => + logger.error(s"$jsValue, $e", e) + errorResults + } + } + + def checkEdges() = withHeaderAsync(jsonParser) { request => + if (!Config.IS_QUERY_SERVER) Future.successful(Unauthorized) + + checkEdgesInner(request.body) + } + + def getVertices() = withHeaderAsync(jsonParser) { request => + getVerticesInner(request.body) + } + + def getVerticesInner(jsValue: JsValue) = { + if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader) + + val jsonQuery = jsValue + val ts = System.currentTimeMillis() + val props = "{}" + + Try { + val vertices = jsonQuery.as[List[JsValue]].flatMap { js => + val serviceName = (js \ "serviceName").as[String] + val columnName = (js \ "columnName").as[String] + for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield { + Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props) + } + } + + s2.getVertices(vertices) map { vertices => + val json = PostProcess.verticesToJson(vertices) + jsonResponse(json, "result_size" -> calcSize(json).toString) + } + } recover { + case e: play.api.libs.json.JsResultException => + logger.error(s"$jsonQuery, $e", e) + badQueryExceptionResults(e) + case e: Exception => + logger.error(s"$jsonQuery, $e", e) + errorResults + } get + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/TestController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/TestController.scala b/s2rest_play/app/controllers/TestController.scala new file mode 100644 index 0000000..8558ae5 --- /dev/null +++ b/s2rest_play/app/controllers/TestController.scala @@ -0,0 +1,24 @@ +package controllers + +import play.api.mvc.{Action, Controller} +import util.TestDataLoader + +import scala.concurrent.Future + + +object TestController extends Controller { + import ApplicationController._ + + def getRandomId() = withHeader(parse.anyContent) { request => + val id = TestDataLoader.randomId + Ok(s"${id}") + } + + def pingAsync() = Action.async(parse.json) { request => + Future.successful(Ok("Pong\n")) + } + + def ping() = Action(parse.json) { request => + Ok("Pong\n") + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/VertexController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/VertexController.scala b/s2rest_play/app/controllers/VertexController.scala new file mode 100644 index 0000000..977c2fc --- /dev/null +++ b/s2rest_play/app/controllers/VertexController.scala @@ -0,0 +1,86 @@ +package controllers + + +import actors.QueueActor +import com.kakao.s2graph.core.rest.RequestParser +import com.kakao.s2graph.core.utils.logger +import com.kakao.s2graph.core.{ExceptionHandler, Graph, GraphExceptions} +import config.Config +import play.api.libs.json.{JsValue, Json} +import play.api.mvc.{Controller, Result} + +import scala.concurrent.Future + +object VertexController extends Controller { + private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph + private val requestParser: RequestParser = com.kakao.s2graph.rest.Global.s2parser + + import ExceptionHandler._ + import controllers.ApplicationController._ + import play.api.libs.concurrent.Execution.Implicits._ + + def tryMutates(jsValue: JsValue, operation: String, serviceNameOpt: Option[String] = None, columnNameOpt: Option[String] = None, withWait: Boolean = false): Future[Result] = { + if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized) + else { + try { + val vertices = requestParser.toVertices(jsValue, operation, serviceNameOpt, columnNameOpt) + + for (vertex <- vertices) { + if (vertex.isAsync) + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, vertex, None)) + else + ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, vertex, None)) + } + + //FIXME: + val verticesToStore = vertices.filterNot(v => v.isAsync) + + if (withWait) { + val rets = s2.mutateVertices(verticesToStore, withWait = true) + rets.map(Json.toJson(_)).map(jsonResponse(_)) + } else { + val rets = verticesToStore.map { vertex => QueueActor.router ! vertex; true } + Future.successful(jsonResponse(Json.toJson(rets))) + } + } catch { + case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"e")) + case e: Exception => + logger.error(s"[Failed] tryMutates", e) + Future.successful(InternalServerError(s"${e.getStackTrace}")) + } + } + } + + def inserts() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insert") + } + + def insertsWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insert", withWait = true) + } + + def insertsSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "insert", Some(serviceName), Some(columnName)) + } + + def deletes() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "delete") + } + + def deletesWithWait() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "delete", withWait = true) + } + + def deletesSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "delete", Some(serviceName), Some(columnName)) + } + + def deletesAll() = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "deleteAll") + } + + def deletesAllSimple(serviceName: String, columnName: String) = withHeaderAsync(jsonParser) { request => + tryMutates(request.body, "deleteAll", Some(serviceName), Some(columnName)) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/models/ExactCounterItem.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/models/ExactCounterItem.scala b/s2rest_play/app/models/ExactCounterItem.scala new file mode 100644 index 0000000..244c046 --- /dev/null +++ b/s2rest_play/app/models/ExactCounterItem.scala @@ -0,0 +1,38 @@ +package models + +import play.api.libs.json.{Json, Writes} + +/** + * Created by alec on 15. 4. 15.. + */ +case class ExactCounterItem(ts: Long, count: Long, score: Double) + +case class ExactCounterIntervalItem(interval: String, dimension: Map[String, String], counter: Seq[ExactCounterItem]) + +case class ExactCounterResultMeta(service: String, action: String, item: String) + +case class ExactCounterResult(meta: ExactCounterResultMeta, data: Seq[ExactCounterIntervalItem]) + +object ExactCounterItem { + implicit val writes = new Writes[ExactCounterItem] { + def writes(item: ExactCounterItem) = Json.obj( + "ts" -> item.ts, + "time" -> tsFormat.format(item.ts), + "count" -> item.count, + "score" -> item.score + ) + } + implicit val reads = Json.reads[ExactCounterItem] +} + +object ExactCounterIntervalItem { + implicit val format = Json.format[ExactCounterIntervalItem] +} + +object ExactCounterResultMeta { + implicit val format = Json.format[ExactCounterResultMeta] +} + +object ExactCounterResult { + implicit val formats = Json.format[ExactCounterResult] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/models/RankCounterItem.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/models/RankCounterItem.scala b/s2rest_play/app/models/RankCounterItem.scala new file mode 100644 index 0000000..aaa7df7 --- /dev/null +++ b/s2rest_play/app/models/RankCounterItem.scala @@ -0,0 +1,40 @@ +package models + +import play.api.libs.json.{Json, Writes} + +/** + * Created by alec on 15. 4. 15.. + */ +case class RankCounterItem(rank: Int, id: String, score: Double) + +case class RankCounterDimensionItem(interval: String, ts: Long, dimension: String, total: Double, ranks: Seq[RankCounterItem]) + +case class RankCounterResultMeta(service: String, action: String) + +case class RankCounterResult(meta: RankCounterResultMeta, data: Seq[RankCounterDimensionItem]) + +object RankCounterItem { + implicit val format = Json.format[RankCounterItem] +} + +object RankCounterDimensionItem { + implicit val writes = new Writes[RankCounterDimensionItem] { + def writes(item: RankCounterDimensionItem) = Json.obj( + "interval" -> item.interval, + "ts" -> item.ts, + "time" -> tsFormat.format(item.ts), + "dimension" -> item.dimension, + "total" -> item.total, + "ranks" -> item.ranks + ) + } + implicit val reads = Json.reads[RankCounterDimensionItem] +} + +object RankCounterResultMeta { + implicit val format = Json.format[RankCounterResultMeta] +} + +object RankCounterResult { + implicit val format = Json.format[RankCounterResult] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/models/package.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/models/package.scala b/s2rest_play/app/models/package.scala new file mode 100644 index 0000000..17fa8e1 --- /dev/null +++ b/s2rest_play/app/models/package.scala @@ -0,0 +1,8 @@ +import java.text.SimpleDateFormat + +/** + * Created by alec on 15. 4. 20.. + */ +package object models { + def tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/util/TestDataLoader.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/util/TestDataLoader.scala b/s2rest_play/app/util/TestDataLoader.scala new file mode 100644 index 0000000..45a9b61 --- /dev/null +++ b/s2rest_play/app/util/TestDataLoader.scala @@ -0,0 +1,70 @@ +package util + +import java.io.File + +import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer} +import scala.io.Source +import scala.util.Random + + +object TestDataLoader { + val step = 100 + val prob = 1.0 + val (testIds, testIdsHist, testIdsHistCnt) = loadSeeds("./talk_vertices.txt") + val maxId = testIds.length + // val randoms = (0 until 100).map{ i => new SecureRandom } + // val idx = new AtomicInteger(0) + // def randomId() = { + // val r = randoms(idx.getAndIncrement() % randoms.size) + // testAccountIds(r.nextInt(maxId)) + // } + def randomId(histStep: Int) = { + for { + maxId <- testIdsHistCnt.get(histStep) + rIdx = Random.nextInt(maxId.toInt) + hist <- testIdsHist.get(histStep) + id = hist(rIdx) + } yield { +// logger.debug(s"randomId: $histStep = $id[$rIdx / $maxId]") + id + } + } + def randomId() = { + val id = testIds(Random.nextInt(maxId)) + // logger.debug(s"$id") + id + } + private def loadSeeds(filePath: String) = { + val histogram = new HashMap[Long, ListBuffer[Long]] + val histogramCnt = new HashMap[Long, Long] + val ids = new ArrayBuffer[Long] + + var idx = 0 +// logger.debug(s"$filePath start to load file.") + for (line <- Source.fromFile(new File(filePath)).getLines) { + // testAccountIds(idx) = line.toLong +// if (idx % 10000 == 0) logger.debug(s"$idx") + idx += 1 + + val parts = line.split("\\t") + val id = parts.head.toLong + val count = parts.last.toLong / step + if (count > 1 && Random.nextDouble < prob) { + histogram.get(count) match { + case None => + histogram.put(count, new ListBuffer[Long]) + histogram.get(count).get += id + histogramCnt.put(count, 1) + case Some(existed) => + existed += id + histogramCnt.put(count, histogramCnt.getOrElse(count, 0L) + 1L) + } + ids += id + } + + } +// logger.debug(s"upload $filePath finished.") +// logger.debug(s"${histogram.size}") + (ids, histogram.map(t => (t._1 -> t._2.toArray[Long])), histogramCnt) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/build.sbt ---------------------------------------------------------------------- diff --git a/s2rest_play/build.sbt b/s2rest_play/build.sbt new file mode 100755 index 0000000..6e0f0bd --- /dev/null +++ b/s2rest_play/build.sbt @@ -0,0 +1,12 @@ +name := "s2rest_play" + +scalacOptions in Test ++= Seq("-Yrangepos") + +libraryDependencies ++= Seq( + ws, + filters, + "com.github.danielwegener" % "logback-kafka-appender" % "0.0.3" +) + +enablePlugins(JavaServerAppPackaging) + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/conf/application.conf ---------------------------------------------------------------------- diff --git a/s2rest_play/conf/application.conf b/s2rest_play/conf/application.conf new file mode 100644 index 0000000..e69de29
