merge S2GRAPH-249
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b8ab86dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b8ab86dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b8ab86dd Branch: refs/heads/master Commit: b8ab86dd39fd3810afcf44160324fa2dc6f5ee3f Parents: 4154bbe Author: DO YUNG YOON <[email protected]> Authored: Thu Nov 29 10:51:20 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu Nov 29 10:51:20 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/Management.scala | 44 ++++ .../org/apache/s2graph/core/schema/Label.scala | 38 ++-- .../apache/s2graph/http/S2GraphAdminRoute.scala | 224 ++++++++++++++++++- .../s2graph/http/S2GraphMutateRoute.scala | 119 ++++++++++ .../scala/org/apache/s2graph/http/Server.scala | 4 +- .../apache/s2graph/http/AdminRouteSpec.scala | 41 +++- .../apache/s2graph/http/MutateRouteSpec.scala | 52 +++++ .../rest/play/controllers/AdminController.scala | 32 +-- 8 files changed, 492 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 0c41ee3..78edf80 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -66,18 +66,56 @@ object Management { } object JsonModel { + import play.api.libs.functional.syntax._ case class Prop(name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false) object Prop extends ((String, String, String, Boolean) => Prop) case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None) + + case class HTableParams(cluster: String, hTableName: String, + preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) { + + override def toString(): String = { + s"""HtableParams + |-- cluster : $cluster + |-- hTableName : $hTableName + |-- preSplitSize : $preSplitSize + |-- hTableTTL : $hTableTTL + |-- compressionAlgorithm : $compressionAlgorithm + |""".stripMargin + } + } + + implicit object HTableParamsJsonConverter extends Format[HTableParams] { + def reads(json: JsValue): JsResult[HTableParams] = ( + (__ \ "cluster").read[String] and + (__ \ "hTableName").read[String] and + (__ \ "preSplitSize").read[Int] and + (__ \ "hTableTTL").readNullable[Int] and + (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json) + + def writes(o: HTableParams): JsValue = Json.obj( + "cluster" -> o.cluster, + "hTableName" -> o.hTableName, + "preSplitSize" -> o.preSplitSize, + "hTableTTL" -> o.hTableTTL, + "compressionAlgorithm" -> o.compressionAlgorithm + ) + } } def findService(serviceName: String) = { Service.findByName(serviceName, useCache = false) } + def findServiceColumn(serviceName: String, columnName: String): Option[ServiceColumn] = { + Service.findByName(serviceName, useCache = false).flatMap { service => + ServiceColumn.find(service.id.get, columnName, useCache = false) + } + } + def deleteService(serviceName: String) = { Service.findByName(serviceName).foreach { service => // service.deleteAll() @@ -133,6 +171,12 @@ object Management { Label.findByName(labelName, useCache = useCache) } + def findLabels(serviceName: String, useCache: Boolean = false): Seq[Label] = { + Service.findByName(serviceName, useCache = useCache).map { service => + Label.findBySrcServiceId(service.id.get, useCache = useCache) + }.getOrElse(Nil) + } + def deleteLabel(labelName: String): Try[Label] = { Schema withTx { implicit session => val label = Label.findByName(labelName, useCache = false).getOrElse(throw GraphExceptions.LabelNotExistException(labelName)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala index a359958..2780475 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala @@ -126,44 +126,50 @@ object Label extends SQLSyntaxSupport[Label] { else sql.get } - def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + def findByTgtColumnId(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = { val cacheKey = className + "tgtColumnId=" + columnId val col = ServiceColumn.findById(columnId) - withCaches(cacheKey)( - sql""" + lazy val sql = sql""" select * from labels where tgt_column_name = ${col.columnName} and service_id = ${col.serviceId} and deleted_at is null - """.map { rs => Label(rs) }.list().apply()) + """.map { rs => Label(rs) }.list().apply() + + if (useCache) withCaches(cacheKey)(sql) + else sql } - def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + def findBySrcColumnId(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = { val cacheKey = className + "srcColumnId=" + columnId val col = ServiceColumn.findById(columnId) - withCaches(cacheKey)( - sql""" + lazy val sql = sql""" select * from labels where src_column_name = ${col.columnName} and service_id = ${col.serviceId} and deleted_at is null - """.map { rs => Label(rs) }.list().apply()) + """.map { rs => Label(rs) }.list().apply() + + if (useCache) withCaches(cacheKey)(sql) + else sql } - def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + def findBySrcServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = { val cacheKey = className + "srcServiceId=" + serviceId - withCaches(cacheKey)( - sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply - ) + lazy val sql = sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply + + if (useCache) withCaches(cacheKey)(sql) + else sql } - def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = { + def findByTgtServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = { val cacheKey = className + "tgtServiceId=" + serviceId - withCaches(cacheKey)( - sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply - ) + lazy val sql = sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply + + if (useCache) withCaches(cacheKey)(sql) + else sql } def insertAll(labelName: String, srcServiceName: String, srcColumnName: String, srcColumnType: String, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala ---------------------------------------------------------------------- diff --git a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala index 8efd2ab..9bf7eb4 100644 --- a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala +++ b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala @@ -3,10 +3,10 @@ package org.apache.s2graph.http import akka.http.scaladsl.model._ import org.apache.s2graph.core.rest.RequestParser import org.apache.s2graph.core.{Management, S2Graph} - import akka.http.scaladsl.server.Route import akka.http.scaladsl.server.Directives._ - +import org.apache.s2graph.core.Management.JsonModel.HTableParams +import org.apache.s2graph.core.schema._ import org.slf4j.LoggerFactory import play.api.libs.json._ @@ -53,6 +53,16 @@ object S2GraphAdminRoute { toHttpEntity(Option(res), status = status) } + + def toHttpEntity[A: AdminMessageFormatter](ls: Seq[A], status: StatusCode = StatusCodes.OK): HttpResponse = { + val ev = implicitly[AdminMessageFormatter[A]] + val res = ls.map(ev.toJson) + + HttpResponse( + status = status, + entity = HttpEntity(ContentTypes.`application/json`, res.toString) + ) + } } trait S2GraphAdminRoute extends PlayJsonSupport { @@ -66,18 +76,40 @@ trait S2GraphAdminRoute extends PlayJsonSupport { lazy val requestParser: RequestParser = new RequestParser(s2graph) // routes impl + /* GET */ + // GET /graphs/getService/:serviceName + lazy val getService = path("getService" / Segment) { serviceName => + val serviceOpt = Management.findService(serviceName) + + complete(toHttpEntity(serviceOpt, message = s"Service not found: ${serviceName}")) + } + + // GET /graphs/getServiceColumn/:serviceName/:columnName + lazy val getServiceColumn = path("getServiceColumn" / Segments) { params => + val (serviceName, columnName) = params match { + case s :: c :: Nil => (s, c) + } + + val ret = Management.findServiceColumn(serviceName, columnName) + complete(toHttpEntity(ret, message = s"ServiceColumn not found: ${serviceName}, ${columnName}")) + } + + // GET /graphs/getLabel/:labelName lazy val getLabel = path("getLabel" / Segment) { labelName => val labelOpt = Management.findLabel(labelName) complete(toHttpEntity(labelOpt, message = s"Label not found: ${labelName}")) } - lazy val getService = path("getService" / Segment) { serviceName => - val serviceOpt = Management.findService(serviceName) + // GET /graphs/getLabels/:serviceName + lazy val getLabels = path("getLabels" / Segment) { serviceName => + val ret = Management.findLabels(serviceName) - complete(toHttpEntity(serviceOpt, message = s"Service not found: ${serviceName}")) + complete(toHttpEntity(ret)) } + /* POST */ + // POST /graphs/createService lazy val createService = path("createService") { entity(as[JsValue]) { params => @@ -91,28 +123,204 @@ trait S2GraphAdminRoute extends PlayJsonSupport { } } + // POST /graphs/createServiceColumn + lazy val createServiceColumn = path("createServiceColumn") { + entity(as[JsValue]) { params => + + val parseTry = requestParser.toServiceColumnElements(params) + val serviceColumnTry = for { + (serviceName, columnName, columnType, props) <- parseTry + serviceColumn <- Try(management.createServiceColumn(serviceName, columnName, columnType, props)) + } yield serviceColumn + + complete(toHttpEntity(serviceColumnTry)) + } + } + + // POST /graphs/createLabel lazy val createLabel = path("createLabel") { entity(as[JsValue]) { params => + val labelTry = requestParser.toLabelElements(params) + + complete(toHttpEntity(labelTry)) + } + } + // POST /graphs/addIndex + lazy val addIndex = path("addIndex") { + entity(as[JsValue]) { params => val labelTry = for { - label <- requestParser.toLabelElements(params) + (labelName, indices) <- requestParser.toIndexElements(params) + label <- Management.addIndex(labelName, indices) } yield label complete(toHttpEntity(labelTry)) } } + // POST /graphs/addProp/:labelName + lazy val addProp = path("addProp" / Segment) { labelName => + entity(as[JsValue]) { params => + val labelMetaTry = for { + prop <- requestParser.toPropElements(params) + labelMeta <- Management.addProp(labelName, prop) + } yield labelMeta + + complete(toHttpEntity(labelMetaTry)) + } + } + + // POST /graphs/addServiceColumnProp/:serviceName/:columnName + lazy val addServiceColumnProp = path("addServiceColumnProp" / Segments) { params => + val (serviceName, columnName, storeInGlobalIndex) = params match { + case s :: c :: Nil => (s, c, false) + case s :: c :: i :: Nil => (s, c, i.toBoolean) + } + + entity(as[JsValue]) { params => + val columnMetaOpt = for { + service <- Service.findByName(serviceName) + serviceColumn <- ServiceColumn.find(service.id.get, columnName) + prop <- requestParser.toPropElements(params).toOption + } yield { + ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, prop.dataType, prop.defaultValue, storeInGlobalIndex) + } + + complete(toHttpEntity(columnMetaOpt, message = s"can`t find service with $serviceName or can`t find serviceColumn with $columnName")) + } + } + + // POST /graphs/createHTable + lazy val createHTable = path("createHTable") { + entity(as[JsValue]) { params => + params.validate[HTableParams] match { + case JsSuccess(hTableParams, _) => { + management.createStorageTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), + hTableParams.preSplitSize, hTableParams.hTableTTL, + hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm)) + + complete(toHttpEntity(None, status = StatusCodes.OK, message = "created")) + } + case err@JsError(_) => complete(toHttpEntity(None, status = StatusCodes.BadRequest, message = Json.toJson(err).toString)) + } + } + } + + // POST /graphs/copyLabel/:oldLabelName/:newLabelName + lazy val copyLabel = path("copyLabel" / Segments) { params => + val (oldLabelName, newLabelName) = params match { + case oldLabel :: newLabel :: Nil => (oldLabel, newLabel) + } + + val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName)) + + complete(toHttpEntity(copyTry)) + } + + // POST /graphs/renameLabel/:oldLabelName/:newLabelName + lazy val renameLabel = path("renameLabel" / Segments) { params => + val (oldLabelName, newLabelName) = params match { + case oldLabel :: newLabel :: Nil => (oldLabel, newLabel) + } + + Label.findByName(oldLabelName) match { + case None => complete(toHttpEntity(None, status = StatusCodes.NotFound, message = s"Label $oldLabelName not found.")) + case Some(label) => + Management.updateLabelName(oldLabelName, newLabelName) + + complete(toHttpEntity(None, message = s"${label} was updated.")) + } + } + + // POST /graphs/swapLabels/:leftLabelName/:rightLabelName + lazy val swapLabel = path("swapLabel" / Segments) { params => + val (leftLabelName, rightLabelName) = params match { + case left :: right :: Nil => (left, right) + } + + val left = Label.findByName(leftLabelName, useCache = false) + val right = Label.findByName(rightLabelName, useCache = false) + // verify same schema + + (left, right) match { + case (Some(l), Some(r)) => + Management.swapLabelNames(leftLabelName, rightLabelName) + + complete(toHttpEntity(None, message = s"Labels were swapped.")) + case _ => + complete(toHttpEntity(None, status = StatusCodes.NotFound, message = s"Label ${leftLabelName} or ${rightLabelName} not found.")) + } + } + + // POST /graphs/updateHTable/:labelName/:newHTableName + lazy val updateHTable = path("updateHTable" / Segments) { params => + val (labelName, newHTableName) = params match { + case l :: h :: Nil => (l, h) + } + + val updateTry = Management.updateHTable(labelName, newHTableName) + + complete(toHttpEntity(updateTry)) + } + + /* PUT */ + // PUT /graphs/deleteLabelReally/:labelName + lazy val deleteLabelReally = path("deleteLabelReally" / Segment) { labelName => + val ret = Management.deleteLabel(labelName).toOption + + complete(toHttpEntity(ret, message = s"Label not found: ${labelName}")) + } + + // PUT /graphs/markDeletedLabel/:labelName + lazy val markDeletedLabel = path("markDeletedLabel" / Segment) { labelName => + val ret = Management.markDeletedLabel(labelName).toOption + + complete(toHttpEntity(ret, message = s"Label not found: ${labelName}")) + } + + // PUT /graphs/deleteServiceColumn/:serviceName/:columnName + lazy val deleteServiceColumn = path("deleteServiceColumn" / Segments) { params => + val (serviceName, columnName) = params match { + case s :: c :: Nil => (s, c) + } + + val ret = Management.deleteColumn(serviceName, columnName).toOption + + complete(toHttpEntity(ret, message = s"ServiceColumn not found: ${serviceName}, ${columnName}")) + } + + // TODO: + // delete service? + // PUT /graphs/loadCache + // expose routes lazy val adminRoute: Route = get { concat( getService, - getLabel + getServiceColumn, + getLabel, + getLabels ) } ~ post { concat( createService, - createLabel + createServiceColumn, + createLabel, + addIndex, + addProp, + addServiceColumnProp, + createHTable, + copyLabel, + renameLabel, + swapLabel, + updateHTable + ) + } ~ put { + concat( + deleteLabelReally, + markDeletedLabel, + deleteServiceColumn ) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala ---------------------------------------------------------------------- diff --git a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala new file mode 100644 index 0000000..fc3b768 --- /dev/null +++ b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala @@ -0,0 +1,119 @@ +package org.apache.s2graph.http + +import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpResponse, StatusCodes} +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.{ExceptionHandler, Route} +import com.fasterxml.jackson.core.JsonParseException +import org.apache.s2graph.core.rest.RequestParser +import org.apache.s2graph.core.storage.MutateResponse +import org.apache.s2graph.core.{GraphElement, S2Graph} +import org.slf4j.LoggerFactory +import play.api.libs.json.{JsValue, Json} + +import scala.concurrent.{ExecutionContext, Future} + +trait S2GraphMutateRoute { + + val s2graph: S2Graph + val logger = LoggerFactory.getLogger(this.getClass) + lazy val parser = new RequestParser(s2graph) + + // lazy val requestParser = new RequestParser(s2graph) + lazy val exceptionHandler = ExceptionHandler { + case ex: JsonParseException => + complete(StatusCodes.BadRequest -> ex.getMessage) + case ex: java.lang.IllegalArgumentException => + complete(StatusCodes.BadRequest -> ex.getMessage) + } + + lazy val mutateVertex = path("vertex" / Segments) { params => + val (operation, serviceNameOpt, columnNameOpt) = params match { + case operation :: serviceName :: columnName :: Nil => + (operation, Option(serviceName), Option(columnName)) + case operation :: Nil => + (operation, None, None) + } + + entity(as[String]) { body => + val payload = Json.parse(body) + + implicit val ec = s2graph.ec + + val future = vertexMutate(payload, operation, serviceNameOpt, columnNameOpt).map { mutateResponses => + HttpResponse( + status = StatusCodes.OK, + entity = HttpEntity(ContentTypes.`application/json`, Json.toJson(mutateResponses).toString) + ) + } + + complete(future) + } + } + + lazy val mutateEdge = path("edge" / Segment) { operation => + entity(as[String]) { body => + val payload = Json.parse(body) + + implicit val ec = s2graph.ec + + val future = edgeMutate(payload, operation, withWait = true).map { mutateResponses => + HttpResponse( + status = StatusCodes.OK, + entity = HttpEntity(ContentTypes.`application/json`, Json.toJson(mutateResponses).toString) + ) + } + + complete(future) + } + } + + def vertexMutate(jsValue: JsValue, + operation: String, + serviceNameOpt: Option[String] = None, + columnNameOpt: Option[String] = None, + withWait: Boolean = true)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + val vertices = parser.toVertices(jsValue, operation, serviceNameOpt, columnNameOpt) + + val verticesToStore = vertices.filterNot(_.isAsync) + + s2graph.mutateVertices(verticesToStore, withWait).map(_.map(_.isSuccess)) + } + + def edgeMutate(elementsWithTsv: Seq[(GraphElement, String)], + withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + val elementWithIdxs = elementsWithTsv.zipWithIndex + + val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) => + !element.isAsync + } + val retToSkip = elementAsync.map(_._2 -> MutateResponse.Success) + val (elementsToStore, _) = elementSync.map(_._1).unzip + val elementsIdxToStore = elementSync.map(_._2) + + s2graph.mutateElements(elementsToStore, withWait).map { mutateResponses => + elementsIdxToStore.zip(mutateResponses) ++ retToSkip + }.map(_.sortBy(_._1).map(_._2.isSuccess)) + } + + def edgeMutate(jsValue: JsValue, + operation: String, + withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + val edgesWithTsv = parser.parseJsonFormat(jsValue, operation) + edgeMutate(edgesWithTsv, withWait) + } + + // expose routes + lazy val mutateRoute: Route = + post { + concat( + handleExceptions(exceptionHandler) { + mutateVertex + }, + handleExceptions(exceptionHandler) { + mutateEdge + } + ) + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/main/scala/org/apache/s2graph/http/Server.scala ---------------------------------------------------------------------- diff --git a/s2http/src/main/scala/org/apache/s2graph/http/Server.scala b/s2http/src/main/scala/org/apache/s2graph/http/Server.scala index 82d2343..c26e314 100644 --- a/s2http/src/main/scala/org/apache/s2graph/http/Server.scala +++ b/s2http/src/main/scala/org/apache/s2graph/http/Server.scala @@ -35,7 +35,8 @@ import org.slf4j.LoggerFactory object Server extends App with S2GraphTraversalRoute - with S2GraphAdminRoute { + with S2GraphAdminRoute + with S2GraphMutateRoute { implicit val system: ActorSystem = ActorSystem("S2GraphHttpServer") implicit val materializer: ActorMaterializer = ActorMaterializer() @@ -54,6 +55,7 @@ object Server extends App // Allows you to determine routes to expose according to external settings. lazy val routes: Route = concat( pathPrefix("graphs")(traversalRoute), + pathPrefix("mutate")(mutateRoute), pathPrefix("admin")(adminRoute), get(complete(health)) ) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala ---------------------------------------------------------------------- diff --git a/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala b/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala index 9239598..eade7e6 100644 --- a/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala +++ b/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala @@ -4,17 +4,16 @@ import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.testkit.ScalatestRouteTest import com.typesafe.config.ConfigFactory +import org.apache.s2graph.core.Management.JsonModel.Prop import org.apache.s2graph.core.S2Graph import org.scalatest.concurrent.ScalaFutures import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} import org.slf4j.LoggerFactory import play.api.libs.json.{JsString, JsValue, Json} -class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with ScalatestRouteTest with S2GraphAdminRoute with BeforeAndAfterAll with PlayJsonSupport { - +class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with ScalatestRouteTest with S2GraphAdminRoute with BeforeAndAfterAll { val config = ConfigFactory.load() val s2graph = new S2Graph(config) - override val logger = LoggerFactory.getLogger(this.getClass) override def afterAll(): Unit = { @@ -23,14 +22,17 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal lazy val routes = adminRoute + val serviceName = "kakaoFavorites" + val columnName = "userName" + "AdminRoute" should { "be able to create service (POST /createService)" in { val serviceParam = Json.obj( - "serviceName" -> "kakaoFavorites", + "serviceName" -> serviceName, "compressionAlgorithm" -> "gz" ) - val serviceEntity = Marshal(serviceParam).to[MessageEntity].futureValue + val serviceEntity = Marshal(serviceParam.toString).to[MessageEntity].futureValue val request = Post("/createService").withEntity(serviceEntity) request ~> routes ~> check { @@ -45,7 +47,7 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal } "return service if present (GET /getService/{serviceName})" in { - val request = HttpRequest(uri = "/getService/kakaoFavorites") + val request = HttpRequest(uri = s"/getService/$serviceName") request ~> routes ~> check { status should ===(StatusCodes.OK) @@ -56,6 +58,33 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal (response \\ "name").head should ===(JsString("kakaoFavorites")) } } + + "be able to create serviceColumn (POST /createServiceColumn)" in { + val serviceColumnParam = Json.obj( + "serviceName" -> serviceName, + "columnName" -> columnName, + "columnType" -> "string", + "props" -> Json.toJson( + Seq( + Json.obj("name" -> "age", "defaultValue" -> "-1", "dataType" -> "integer") + ) + ) + ) + + val serviceColumnEntity = Marshal(serviceColumnParam.toString).to[MessageEntity].futureValue + val request = Post("/createServiceColumn").withEntity(serviceColumnEntity) + + request ~> routes ~> check { + status should ===(StatusCodes.Created) + contentType should ===(ContentTypes.`application/json`) + + val response = entityAs[JsValue] + + (response \\ "serviceName").head should ===(JsString("kakaoFavorites")) + (response \\ "columnName").head should ===(JsString("userName")) + (response \\ "status").head should ===(JsString("ok")) + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala ---------------------------------------------------------------------- diff --git a/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala b/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala new file mode 100644 index 0000000..f823cd5 --- /dev/null +++ b/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala @@ -0,0 +1,52 @@ +package org.apache.s2graph.http + +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model._ +import akka.http.scaladsl.testkit.ScalatestRouteTest +import com.typesafe.config.ConfigFactory +import org.apache.s2graph.core.S2Graph +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import org.slf4j.LoggerFactory +import play.api.libs.json.{JsValue, Json} + +class MutateRouteSpec extends WordSpec with Matchers with PlayJsonSupport with ScalaFutures with ScalatestRouteTest with S2GraphMutateRoute with BeforeAndAfterAll { + val config = ConfigFactory.load() + val s2graph = new S2Graph(config) + override val logger = LoggerFactory.getLogger(this.getClass) + + override def afterAll(): Unit = { + s2graph.shutdown() + } + + lazy val routes = mutateRoute + + val serviceName = "kakaoFavorites" + val columnName = "userName" + + "MutateRoute" should { + "be able to insert vertex (POST /mutate/vertex/insert)" in { + // {"timestamp": 10, "serviceName": "s2graph", "columnName": "user", "id": 1, "props": {}} + val param = Json.obj( + "timestamp" -> 10, + "serviceName" -> serviceName, + "columnName" -> columnName, + "id" -> "user_a", + "props" -> Json.obj( + "age" -> 20 + ) + ) + val entity = Marshal(param.toString).to[MessageEntity].futureValue + val request = Post("/vertex/insert").withEntity(entity) + + request ~> routes ~> check { + status should ===(StatusCodes.OK) + contentType should ===(ContentTypes.`application/json`) + + val response = entityAs[JsValue] + response should ===(Json.toJson(Seq(true))) + } + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala index 00702cf..fdd9b63 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala @@ -20,6 +20,7 @@ package org.apache.s2graph.rest.play.controllers import org.apache.s2graph.core.Management +import org.apache.s2graph.core.Management.JsonModel.HTableParams import org.apache.s2graph.core.schema._ import org.apache.s2graph.core.rest.RequestParser import org.apache.s2graph.core.utils.logger @@ -392,37 +393,6 @@ object AdminController extends Controller { } } - case class HTableParams(cluster: String, hTableName: String, - preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) { - - override def toString(): String = { - s"""HtableParams - |-- cluster : $cluster - |-- hTableName : $hTableName - |-- preSplitSize : $preSplitSize - |-- hTableTTL : $hTableTTL - |-- compressionAlgorithm : $compressionAlgorithm - |""".stripMargin - } - } - - implicit object HTableParamsJsonConverter extends Format[HTableParams] { - def reads(json: JsValue): JsResult[HTableParams] = ( - (__ \ "cluster").read[String] and - (__ \ "hTableName").read[String] and - (__ \ "preSplitSize").read[Int] and - (__ \ "hTableTTL").readNullable[Int] and - (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json) - - def writes(o: HTableParams): JsValue = Json.obj( - "cluster" -> o.cluster, - "hTableName" -> o.hTableName, - "preSplitSize" -> o.preSplitSize, - "hTableTTL" -> o.hTableTTL, - "compressionAlgorithm" -> o.compressionAlgorithm - ) - } - implicit object JsErrorJsonWriter extends Writes[JsError] { def writes(o: JsError): JsValue = Json.obj( "errors" -> JsArray(
