Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 8dbb9a3ee -> 66bdf1bc0


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
index ea90061..8e80fd9 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
@@ -19,15 +19,16 @@
 
 package org.apache.s2graph.core.storage.hbase
 
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.mysqls.{Model, Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.{IndexEdge, TestCommonWithModels, Vertex}
+import org.apache.s2graph.core.{QueryParam, IndexEdge, TestCommonWithModels, 
Vertex}
 import org.scalatest.{FunSuite, Matchers}
 
 
 class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels {
   initTests()
 
+  val testLabelMeta = LabelMeta(Option(-1), labelV2.id.get, "test", 1.toByte, 
"0.0", "double")
   /**
    * check if storage serializer/deserializer can translate from/to bytes 
array.
    * @param l: label for edge.
@@ -35,17 +36,19 @@ class IndexEdgeTest extends FunSuite with Matchers with 
TestCommonWithModels {
    * @param to: to VertexId for edge.
    * @param props: expected props of edge.
    */
-  def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, 
InnerValLikeWithTs]): Unit = {
+  def check(l: Label, ts: Long, to: InnerValLike, props: Map[LabelMeta, 
InnerValLikeWithTs]): Unit = {
     val from = InnerVal.withLong(1, l.schemaVersion)
     val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from)
     val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to)
     val vertex = Vertex(vertexId, ts)
     val tgtVertex = Vertex(tgtVertexId, ts)
     val labelWithDir = LabelWithDirection(l.id.get, 0)
+    val labelOpt = Option(l)
+
+    val indexEdge = IndexEdge(vertex, tgtVertex, l, labelWithDir.dir, 0, ts, 
LabelIndex.DefaultSeq, props, tsInnerValOpt = Option(InnerVal.withLong(ts, 
l.schemaVersion)))
+    val _indexEdgeOpt = 
graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(labelOpt,
+      graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues, 
l.schemaVersion, None)
 
-    val indexEdge = IndexEdge(vertex, tgtVertex, labelWithDir, 0, ts, 
LabelIndex.DefaultSeq, props)
-    val _indexEdgeOpt = 
graph.storage.indexEdgeDeserializer(l.schemaVersion).fromKeyValues(queryParam,
-      graph.storage.indexEdgeSerializer(indexEdge).toKeyValues, 
l.schemaVersion, None)
 
     _indexEdgeOpt should not be empty
     indexEdge should be(_indexEdgeOpt.get)
@@ -60,8 +63,8 @@ class IndexEdgeTest extends FunSuite with Matchers with 
TestCommonWithModels {
     } {
       val to = InnerVal.withLong(101, l.schemaVersion)
       val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, 
l.schemaVersion)
-      val props = Map(LabelMeta.timeStampSeq -> tsInnerValWithTs,
-        1.toByte -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion))
+      val props = Map(LabelMeta.timestamp -> tsInnerValWithTs,
+        testLabelMeta -> InnerValLikeWithTs.withDouble(2.1, ts, 
l.schemaVersion))
 
       check(l, ts, to, props)
     }
@@ -75,8 +78,8 @@ class IndexEdgeTest extends FunSuite with Matchers with 
TestCommonWithModels {
       val to = InnerVal.withStr("0", l.schemaVersion)
       val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, 
l.schemaVersion)
       val props = Map(
-        LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(10, ts, 
l.schemaVersion),
-        LabelMeta.timeStampSeq -> tsInnerValWithTs)
+        LabelMeta.degree -> InnerValLikeWithTs.withLong(10, ts, 
l.schemaVersion),
+        LabelMeta.timestamp -> tsInnerValWithTs)
 
       check(l, ts, to, props)
     }
@@ -89,12 +92,12 @@ class IndexEdgeTest extends FunSuite with Matchers with 
TestCommonWithModels {
     } {
       val to = InnerVal.withLong(101, l.schemaVersion)
       val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, 
l.schemaVersion)
-      val props = Map(LabelMeta.timeStampSeq -> tsInnerValWithTs,
-        1.toByte -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion),
-        LabelMeta.countSeq -> InnerValLikeWithTs.withLong(10, ts, 
l.schemaVersion))
+      val props = Map(LabelMeta.timestamp -> tsInnerValWithTs,
+        testLabelMeta -> InnerValLikeWithTs.withDouble(2.1, ts, 
l.schemaVersion),
+        LabelMeta.count -> InnerValLikeWithTs.withLong(10, ts, 
l.schemaVersion))
 
 
       check(l, ts, to, props)
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala 
b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
index c8f65bf..a6f8f5c 100644
--- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -33,20 +33,19 @@ import io.netty.handler.codec.http.HttpHeaders._
 import io.netty.handler.codec.http._
 import io.netty.handler.logging.{LogLevel, LoggingHandler}
 import io.netty.util.CharsetUtil
-import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import org.apache.s2graph.core.GraphExceptions.{BadQueryException}
 import org.apache.s2graph.core.mysqls.Experiment
 import org.apache.s2graph.core.rest.RestHandler
 import org.apache.s2graph.core.rest.RestHandler.{CanLookup, HandlerResult}
 import org.apache.s2graph.core.utils.Extensions._
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{Graph, JSONParser, PostProcess}
+import org.apache.s2graph.core.{Graph, PostProcess}
 import play.api.libs.json._
 
 import scala.collection.mutable
 import scala.concurrent.{ExecutionContext, Future}
 import scala.io.Source
 import scala.util.{Failure, Success, Try}
-import scala.language.existentials
 
 class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) 
extends SimpleChannelInboundHandler[FullHttpRequest] {
   val ApplicationJson = "application/json"
@@ -101,8 +100,9 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: 
ExecutionContext) extends
     result.body onComplete {
       case Success(json) =>
         val duration = System.currentTimeMillis() - startedAt
+        val bucketName = 
result.headers.toMap.get(Experiment.ImpressionKey).getOrElse("")
 
-        val log = s"${req.getMethod} ${req.getUri} took ${duration} ms 200 
${s2rest.calcSize(json)} ${requestBody}"
+        val log = s"${req.getMethod} ${req.getUri} took ${duration} ms 200 
${s2rest.calcSize(json)} ${requestBody} ${bucketName}"
         logger.info(log)
 
         val buf: ByteBuf = Unpooled.copiedBuffer(json.toString, 
CharsetUtil.UTF_8)
@@ -114,7 +114,7 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: 
ExecutionContext) extends
         case e: BadQueryException =>
           logger.error(s"{$requestBody}, ${e.getMessage}", e)
           val buf: ByteBuf = 
Unpooled.copiedBuffer(PostProcess.badRequestResults(e).toString, 
CharsetUtil.UTF_8)
-          simpleResponse(ctx, Ok, byteBufOpt = Option(buf), 
channelFutureListenerOpt = CloseOpt, headers = headers.result())
+          simpleResponse(ctx, BadRequest, byteBufOpt = Option(buf), 
channelFutureListenerOpt = CloseOpt, headers = headers.result())
         case e: Exception =>
           logger.error(s"${requestBody}, ${e.getMessage}", e)
           val buf: ByteBuf = 
Unpooled.copiedBuffer(PostProcess.emptyResults.toString, CharsetUtil.UTF_8)
@@ -204,7 +204,9 @@ class S2RestHandler(s2rest: RestHandler)(implicit ec: 
ExecutionContext) extends
 }
 
 // Simple http server
-object NettyServer {
+object NettyServer extends App {
+  /** should be same with Boostrap.onStart on play */
+
   val numOfThread = Runtime.getRuntime.availableProcessors()
   val threadPool = Executors.newFixedThreadPool(numOfThread)
   val ec = ExecutionContext.fromExecutor(threadPool)
@@ -234,31 +236,30 @@ object NettyServer {
       (new NioEventLoopGroup(1), new NioEventLoopGroup(), 
classOf[NioServerSocketChannel])
   }
 
-  def main(args : scala.Array[scala.Predef.String]) : scala.Unit = {
-    try {
-      val b: ServerBootstrap = new ServerBootstrap()
-        .option(ChannelOption.SO_BACKLOG, Int.box(2048))
-      b.group(bossGroup, workerGroup).channel(channelClass)
-        .handler(new LoggingHandler(LogLevel.INFO))
-        .childHandler(new ChannelInitializer[SocketChannel] {
-          override def initChannel(ch: SocketChannel) {
-            val p = ch.pipeline()
-            p.addLast(new HttpServerCodec())
-            p.addLast(new HttpObjectAggregator(maxBodySize))
-            p.addLast(new S2RestHandler(rest)(ec))
-          }
-        })
-
-      // for check server is started
-      logger.info(s"Listening for HTTP on /0.0.0.0:$port")
-      val ch: Channel = b.bind(port).sync().channel()
-      ch.closeFuture().sync()
-
-    } finally {
-      bossGroup.shutdownGracefully()
-      workerGroup.shutdownGracefully()
-      s2graph.shutdown()
-    }
+  try {
+    val b: ServerBootstrap = new ServerBootstrap()
+      .option(ChannelOption.SO_BACKLOG, Int.box(2048))
+
+    b.group(bossGroup, workerGroup).channel(channelClass)
+      .handler(new LoggingHandler(LogLevel.INFO))
+      .childHandler(new ChannelInitializer[SocketChannel] {
+        override def initChannel(ch: SocketChannel) {
+          val p = ch.pipeline()
+          p.addLast(new HttpServerCodec())
+          p.addLast(new HttpObjectAggregator(maxBodySize))
+          p.addLast(new S2RestHandler(rest)(ec))
+        }
+      })
+
+    // for check server is started
+    logger.info(s"Listening for HTTP on /0.0.0.0:$port")
+    val ch: Channel = b.bind(port).sync().channel()
+    ch.closeFuture().sync()
+
+  } finally {
+    bossGroup.shutdownGracefully()
+    workerGroup.shutdownGracefully()
+    s2graph.shutdown()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
index 36610e0..087e12e 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
@@ -448,7 +448,7 @@ object AdminController extends Controller {
     //    Management.createTable(cluster, hTableName, List("e", "v"), 
preSplitSize, hTableTTL, compressionAlgorithm)
     request.body.asJson.map(_.validate[HTableParams] match {
       case JsSuccess(hTableParams, _) => {
-        management.createTable(hTableParams.cluster, hTableParams.hTableName, 
List("e", "v"),
+        management.createStorageTable(hTableParams.cluster, 
hTableParams.hTableName, List("e", "v"),
           hTableParams.preSplitSize, hTableParams.hTableTTL,
           
hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm))
         logger.info(hTableParams.toString())

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 8000cf8..835cc72 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -20,6 +20,7 @@
 package org.apache.s2graph.rest.play.controllers
 
 import com.fasterxml.jackson.databind.JsonMappingException
+import org.apache.s2graph.core.ExceptionHandler.KafkaMessage
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.Label
 import org.apache.s2graph.core.rest.RequestParser
@@ -35,7 +36,6 @@ import scala.concurrent.Future
 object EdgeController extends Controller {
 
   import ApplicationController._
-  import ExceptionHandler._
   import play.api.libs.concurrent.Execution.Implicits._
 
   private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
@@ -232,8 +232,8 @@ object EdgeController extends Controller {
     else {
 
       s2.incrementCounts(edges, withWait = true).map { results =>
-        val json = results.map { case (isSuccess, resultCount) =>
-          Json.obj("success" -> isSuccess, "result" -> resultCount)
+        val json = results.map { case (isSuccess, resultCount, count) =>
+          Json.obj("success" -> isSuccess, "result" -> resultCount, "_count" 
-> count)
         }
 
         jsonResponse(Json.toJson(json))

Reply via email to