Repository: incubator-s2graph Updated Branches: refs/heads/master 8dbb9a3ee -> 66bdf1bc0
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala index ea90061..8e80fd9 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala @@ -19,15 +19,16 @@ package org.apache.s2graph.core.storage.hbase -import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} +import org.apache.s2graph.core.mysqls.{Model, Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.{IndexEdge, TestCommonWithModels, Vertex} +import org.apache.s2graph.core.{QueryParam, IndexEdge, TestCommonWithModels, Vertex} import org.scalatest.{FunSuite, Matchers} class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { initTests() + val testLabelMeta = LabelMeta(Option(-1), labelV2.id.get, "test", 1.toByte, "0.0", "double") /** * check if storage serializer/deserializer can translate from/to bytes array. * @param l: label for edge. @@ -35,17 +36,19 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { * @param to: to VertexId for edge. * @param props: expected props of edge. */ - def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, InnerValLikeWithTs]): Unit = { + def check(l: Label, ts: Long, to: InnerValLike, props: Map[LabelMeta, InnerValLikeWithTs]): Unit = { val from = InnerVal.withLong(1, l.schemaVersion) val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from) val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to) val vertex = Vertex(vertexId, ts) val tgtVertex = Vertex(tgtVertexId, ts) val labelWithDir = LabelWithDirection(l.id.get, 0) + val labelOpt = Option(l) + + val indexEdge = IndexEdge(vertex, tgtVertex, l, labelWithDir.dir, 0, ts, LabelIndex.DefaultSeq, props, tsInnerValOpt = Option(InnerVal.withLong(ts, l.schemaVersion))) + val _indexEdgeOpt = graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(labelOpt, + graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues, l.schemaVersion, None) - val indexEdge = IndexEdge(vertex, tgtVertex, labelWithDir, 0, ts, LabelIndex.DefaultSeq, props) - val _indexEdgeOpt = graph.storage.indexEdgeDeserializer(l.schemaVersion).fromKeyValues(queryParam, - graph.storage.indexEdgeSerializer(indexEdge).toKeyValues, l.schemaVersion, None) _indexEdgeOpt should not be empty indexEdge should be(_indexEdgeOpt.get) @@ -60,8 +63,8 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { } { val to = InnerVal.withLong(101, l.schemaVersion) val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) - val props = Map(LabelMeta.timeStampSeq -> tsInnerValWithTs, - 1.toByte -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion)) + val props = Map(LabelMeta.timestamp -> tsInnerValWithTs, + testLabelMeta -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion)) check(l, ts, to, props) } @@ -75,8 +78,8 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { val to = InnerVal.withStr("0", l.schemaVersion) val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) val props = Map( - LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion), - LabelMeta.timeStampSeq -> tsInnerValWithTs) + LabelMeta.degree -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion), + LabelMeta.timestamp -> tsInnerValWithTs) check(l, ts, to, props) } @@ -89,12 +92,12 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { } { val to = InnerVal.withLong(101, l.schemaVersion) val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) - val props = Map(LabelMeta.timeStampSeq -> tsInnerValWithTs, - 1.toByte -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion), - LabelMeta.countSeq -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion)) + val props = Map(LabelMeta.timestamp -> tsInnerValWithTs, + testLabelMeta -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion), + LabelMeta.count -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion)) check(l, ts, to, props) } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala ---------------------------------------------------------------------- diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala index c8f65bf..a6f8f5c 100644 --- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala +++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala @@ -33,20 +33,19 @@ import io.netty.handler.codec.http.HttpHeaders._ import io.netty.handler.codec.http._ import io.netty.handler.logging.{LogLevel, LoggingHandler} import io.netty.util.CharsetUtil -import org.apache.s2graph.core.GraphExceptions.BadQueryException +import org.apache.s2graph.core.GraphExceptions.{BadQueryException} import org.apache.s2graph.core.mysqls.Experiment import org.apache.s2graph.core.rest.RestHandler import org.apache.s2graph.core.rest.RestHandler.{CanLookup, HandlerResult} import org.apache.s2graph.core.utils.Extensions._ import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{Graph, JSONParser, PostProcess} +import org.apache.s2graph.core.{Graph, PostProcess} import play.api.libs.json._ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.io.Source import scala.util.{Failure, Success, Try} -import scala.language.existentials class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends SimpleChannelInboundHandler[FullHttpRequest] { val ApplicationJson = "application/json" @@ -101,8 +100,9 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends result.body onComplete { case Success(json) => val duration = System.currentTimeMillis() - startedAt + val bucketName = result.headers.toMap.get(Experiment.ImpressionKey).getOrElse("") - val log = s"${req.getMethod} ${req.getUri} took ${duration} ms 200 ${s2rest.calcSize(json)} ${requestBody}" + val log = s"${req.getMethod} ${req.getUri} took ${duration} ms 200 ${s2rest.calcSize(json)} ${requestBody} ${bucketName}" logger.info(log) val buf: ByteBuf = Unpooled.copiedBuffer(json.toString, CharsetUtil.UTF_8) @@ -114,7 +114,7 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends case e: BadQueryException => logger.error(s"{$requestBody}, ${e.getMessage}", e) val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.badRequestResults(e).toString, CharsetUtil.UTF_8) - simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = CloseOpt, headers = headers.result()) + simpleResponse(ctx, BadRequest, byteBufOpt = Option(buf), channelFutureListenerOpt = CloseOpt, headers = headers.result()) case e: Exception => logger.error(s"${requestBody}, ${e.getMessage}", e) val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.emptyResults.toString, CharsetUtil.UTF_8) @@ -204,7 +204,9 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends } // Simple http server -object NettyServer { +object NettyServer extends App { + /** should be same with Boostrap.onStart on play */ + val numOfThread = Runtime.getRuntime.availableProcessors() val threadPool = Executors.newFixedThreadPool(numOfThread) val ec = ExecutionContext.fromExecutor(threadPool) @@ -234,31 +236,30 @@ object NettyServer { (new NioEventLoopGroup(1), new NioEventLoopGroup(), classOf[NioServerSocketChannel]) } - def main(args : scala.Array[scala.Predef.String]) : scala.Unit = { - try { - val b: ServerBootstrap = new ServerBootstrap() - .option(ChannelOption.SO_BACKLOG, Int.box(2048)) - b.group(bossGroup, workerGroup).channel(channelClass) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(new ChannelInitializer[SocketChannel] { - override def initChannel(ch: SocketChannel) { - val p = ch.pipeline() - p.addLast(new HttpServerCodec()) - p.addLast(new HttpObjectAggregator(maxBodySize)) - p.addLast(new S2RestHandler(rest)(ec)) - } - }) - - // for check server is started - logger.info(s"Listening for HTTP on /0.0.0.0:$port") - val ch: Channel = b.bind(port).sync().channel() - ch.closeFuture().sync() - - } finally { - bossGroup.shutdownGracefully() - workerGroup.shutdownGracefully() - s2graph.shutdown() - } + try { + val b: ServerBootstrap = new ServerBootstrap() + .option(ChannelOption.SO_BACKLOG, Int.box(2048)) + + b.group(bossGroup, workerGroup).channel(channelClass) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer[SocketChannel] { + override def initChannel(ch: SocketChannel) { + val p = ch.pipeline() + p.addLast(new HttpServerCodec()) + p.addLast(new HttpObjectAggregator(maxBodySize)) + p.addLast(new S2RestHandler(rest)(ec)) + } + }) + + // for check server is started + logger.info(s"Listening for HTTP on /0.0.0.0:$port") + val ch: Channel = b.bind(port).sync().channel() + ch.closeFuture().sync() + + } finally { + bossGroup.shutdownGracefully() + workerGroup.shutdownGracefully() + s2graph.shutdown() } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala index 36610e0..087e12e 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala @@ -448,7 +448,7 @@ object AdminController extends Controller { // Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm) request.body.asJson.map(_.validate[HTableParams] match { case JsSuccess(hTableParams, _) => { - management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), + management.createStorageTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), hTableParams.preSplitSize, hTableParams.hTableTTL, hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm)) logger.info(hTableParams.toString()) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala index 8000cf8..835cc72 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -20,6 +20,7 @@ package org.apache.s2graph.rest.play.controllers import com.fasterxml.jackson.databind.JsonMappingException +import org.apache.s2graph.core.ExceptionHandler.KafkaMessage import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.Label import org.apache.s2graph.core.rest.RequestParser @@ -35,7 +36,6 @@ import scala.concurrent.Future object EdgeController extends Controller { import ApplicationController._ - import ExceptionHandler._ import play.api.libs.concurrent.Execution.Implicits._ private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph @@ -232,8 +232,8 @@ object EdgeController extends Controller { else { s2.incrementCounts(edges, withWait = true).map { results => - val json = results.map { case (isSuccess, resultCount) => - Json.obj("success" -> isSuccess, "result" -> resultCount) + val json = results.map { case (isSuccess, resultCount, count) => + Json.obj("success" -> isSuccess, "result" -> resultCount, "_count" -> count) } jsonResponse(Json.toJson(json))
