Repository: incubator-s2graph
Updated Branches:
  refs/heads/master f0f1081b1 -> 32eb344a7


naive implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3ddea3f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3ddea3f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3ddea3f7

Branch: refs/heads/master
Commit: 3ddea3f78acad63e52ef855642390cb751d172b8
Parents: 33e3d26
Author: daewon <[email protected]>
Authored: Wed May 23 19:33:42 2018 +0900
Committer: daewon <[email protected]>
Committed: Thu May 24 13:33:05 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/EdgeFetcher.scala   |   2 +-
 .../org/apache/s2graph/core/EdgeMutator.scala   |   2 +-
 .../org/apache/s2graph/core/Management.scala    |  93 +++---
 .../apache/s2graph/core/ResourceManager.scala   |  59 +++-
 .../scala/org/apache/s2graph/core/S2Graph.scala |   6 +-
 .../org/apache/s2graph/core/VertexFetcher.scala |   2 +-
 .../org/apache/s2graph/core/VertexMutator.scala |   2 +-
 .../core/storage/jdbc/JdbcEdgeFetcher.scala     |  80 ++++++
 .../core/storage/jdbc/JdbcEdgeMutator.scala     |  87 ++++++
 .../s2graph/core/storage/jdbc/JdbcStorage.scala | 177 ++++++++++++
 .../apache/s2graph/core/utils/Importer.scala    |   3 +-
 .../s2graph/core/utils/SafeUpdateCache.scala    |  21 +-
 .../s2graph/core/fetcher/BaseFetcherTest.scala  |  17 +-
 .../core/storage/jdbc/JdbcStorageTest.scala     | 281 +++++++++++++++++++
 .../org/apache/s2graph/graphql/TestGraph.scala  |   7 +-
 15 files changed, 767 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
index c3760e0..82148c6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
@@ -24,7 +24,7 @@ import org.apache.s2graph.core.types.VertexId
 
 import scala.concurrent.{ExecutionContext, Future}
 
-trait EdgeFetcher {
+trait EdgeFetcher extends AutoCloseable {
 
   def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
index 252d129..db8f142 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
@@ -24,7 +24,7 @@ import org.apache.s2graph.core.storage.MutateResponse
 
 import scala.concurrent.{ExecutionContext, Future}
 
-trait EdgeMutator {
+trait EdgeMutator extends AutoCloseable {
 
   def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]]
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index c41e890..0c41ee3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -36,10 +36,11 @@ import scala.concurrent.{ExecutionContext, Future}
 import scala.util.Try
 
 /**
- * This is designed to be bridge between rest to s2core.
- * s2core never use this for finding models.
- */
+  * This is designed to be bridge between rest to s2core.
+  * s2core never use this for finding models.
+  */
 object Management {
+
   import HBaseType._
   import scala.collection.JavaConversions._
 
@@ -196,8 +197,8 @@ object Management {
       service <- Service.findByName(serviceName, useCache = false)
       serviceColumn <- ServiceColumn.find(service.id.get, columnName)
     } yield {
-        ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType, 
defaultValue, storeInGlobalIndex)
-      }
+      ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType, 
defaultValue, storeInGlobalIndex)
+    }
     result.getOrElse({
       throw new RuntimeException(s"add property on vertex failed")
     })
@@ -231,11 +232,11 @@ object Management {
       (k, v) <- js.fields
       meta <- column.metasInvMap.get(k)
     } yield {
-        val innerVal = jsValueToInnerVal(v, meta.dataType, 
column.schemaVersion).getOrElse(
-          throw new RuntimeException(s"$k is not defined. create schema for 
vertex."))
+      val innerVal = jsValueToInnerVal(v, meta.dataType, 
column.schemaVersion).getOrElse(
+        throw new RuntimeException(s"$k is not defined. create schema for 
vertex."))
 
-        (meta.seq.toInt, innerVal)
-      }
+      (meta.seq.toInt, innerVal)
+    }
     props
 
   }
@@ -279,20 +280,21 @@ object Management {
       Label.updateName(tempLabel, rightLabel)
     }
   }
+
   def toConfig(params: Map[String, Any]): Config = {
     import scala.collection.JavaConversions._
 
     val filtered = params.filter { case (k, v) =>
-        v match {
-          case None => false
-          case _ => true
-        }
+      v match {
+        case None => false
+        case _ => true
+      }
     }.map { case (k, v) =>
-        val newV = v match {
-          case Some(value) => value
-          case _ => v
-        }
-        k -> newV
+      val newV = v match {
+        case Some(value) => value
+        case _ => v
+      }
+      k -> newV
     }
 
     ConfigFactory.parseMap(filtered)
@@ -353,13 +355,13 @@ class Management(graph: S2GraphLike) {
   }
 
   def createStorageTable(zkAddr: String,
-                  tableName: String,
-                  cfs: List[String],
-                  regionMultiplier: Int,
-                  ttl: Option[Int],
-                  compressionAlgorithm: String = DefaultCompressionAlgorithm,
-                  replicationScopeOpt: Option[Int] = None,
-                  totalRegionCount: Option[Int] = None): Unit = {
+                         tableName: String,
+                         cfs: List[String],
+                         regionMultiplier: Int,
+                         ttl: Option[Int],
+                         compressionAlgorithm: String = 
DefaultCompressionAlgorithm,
+                         replicationScopeOpt: Option[Int] = None,
+                         totalRegionCount: Option[Int] = None): Unit = {
     val config = toConfig(Map(
       ZookeeperQuorum -> zkAddr,
 //      ColumnFamilies -> cfs,
@@ -374,11 +376,11 @@ class Management(graph: S2GraphLike) {
 
   /** HBase specific code */
   def createService(serviceName: String,
-                   cluster: String,
-                   hTableName: String,
-                   preSplitSize: Int,
-                   hTableTTL: Int,
-                   compressionAlgorithm: String): Service = {
+                    cluster: String,
+                    hTableName: String,
+                    preSplitSize: Int,
+                    hTableTTL: Int,
+                    compressionAlgorithm: String): Service = {
     createService(serviceName, cluster, hTableName, preSplitSize,
       Option(hTableTTL).filter(_ > -1), compressionAlgorithm).get
   }
@@ -439,6 +441,7 @@ class Management(graph: S2GraphLike) {
 
     serviceColumnTry.get
   }
+
   def createLabel(labelName: String,
                   srcColumn: ServiceColumn,
                   tgtColumn: ServiceColumn,
@@ -451,7 +454,8 @@ class Management(graph: S2GraphLike) {
                   hTableTTL: Int,
                   schemaVersion: String,
                   compressionAlgorithm: String,
-                  options: String): Label = {
+                  options: String
+                 ): Label = {
     import scala.collection.JavaConversions._
 
     createLabel(labelName,
@@ -481,13 +485,15 @@ class Management(graph: S2GraphLike) {
                   schemaVersion: String = DEFAULT_VERSION,
                   isAsync: Boolean = false,
                   compressionAlgorithm: String = "gz",
-                  options: Option[String] = None): Try[Label] = {
+                  options: Option[String] = None,
+                  initFetcherWithOptions: Boolean = false
+                 ): Try[Label] = {
 
-    if (label.length > LABEL_NAME_MAX_LENGTH ) throw new 
LabelNameTooLongException(s"Label name ${label} too long.( max length : 
${LABEL_NAME_MAX_LENGTH}} )")
+    if (label.length > LABEL_NAME_MAX_LENGTH) throw new 
LabelNameTooLongException(s"Label name ${label} too long.( max length : 
${LABEL_NAME_MAX_LENGTH}} )")
     if (hTableName.isEmpty && hTableTTL.isDefined) throw new 
RuntimeException("if want to specify ttl, give hbaseTableName also")
 
     val labelOpt = Label.findByName(label, useCache = false)
-    Schema withTx { implicit session =>
+    val newLabelTry = Schema withTx { implicit session =>
       if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label 
name ${label} already exist.")
 
       /* create all models */
@@ -509,20 +515,27 @@ class Management(graph: S2GraphLike) {
       ))
       storage.createTable(config, newLabel.hbaseTableName)
 
-      updateEdgeFetcher(newLabel, None)
-      updateEdgeFetcher(newLabel, None)
-
       newLabel
     }
+
+    newLabelTry.foreach { newLabel =>
+      if (initFetcherWithOptions) {
+        updateEdgeFetcher(newLabel, options)
+      } else {
+        updateEdgeFetcher(newLabel, None)
+      }
+    }
+
+    newLabelTry
   }
 
   /**
    * label
    */
   /**
-   * copy label when if oldLabel exist and newLabel do not exist.
-   * copy label: only used by bulk load job. not sure if we need to 
parameterize hbase cluster.
-   */
+    * copy label when if oldLabel exist and newLabel do not exist.
+    * copy label: only used by bulk load job. not sure if we need to 
parameterize hbase cluster.
+    */
   def copyLabel(oldLabelName: String, newLabelName: String, hTableName: 
Option[String]): Try[Label] = {
     val old = Label.findByName(oldLabelName, useCache = false).getOrElse(throw 
new LabelNotExistException(s"Old label $oldLabelName not exists."))
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
index a8a3a34..423b3c9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
@@ -19,10 +19,15 @@
 
 package org.apache.s2graph.core
 
-import com.typesafe.config.{Config, ConfigFactory}
+import com.typesafe.config.impl.ConfigImpl
+import com.typesafe.config._
 import org.apache.s2graph.core.schema.{Label, ServiceColumn}
-import org.apache.s2graph.core.utils.SafeUpdateCache
+import org.apache.s2graph.core.utils.{SafeUpdateCache, logger}
+
 import scala.concurrent.ExecutionContext
+import scala.reflect.ClassTag
+import scala.tools.nsc.typechecker.StructuredTypeStrings
+import scala.util.Try
 
 
 object ResourceManager {
@@ -32,14 +37,19 @@ object ResourceManager {
   import scala.collection.JavaConverters._
 
   val ClassNameKey = "className"
-  val EdgeFetcherKey = classOf[EdgeFetcher].getClass().getName
+  val MaxSizeKey = "resource.cache.max.size"
+  val CacheTTL = "resource.cache.ttl.seconds"
+
+  val EdgeFetcherKey = classOf[EdgeFetcher].getName
 
-  val VertexFetcherKey = classOf[VertexFetcher].getClass().getName
+  val VertexFetcherKey = classOf[VertexFetcher].getName
 
-  val EdgeMutatorKey = classOf[EdgeMutator].getClass.getName
-  val VertexMutatorKey = classOf[VertexMutator].getClass.getName
+  val EdgeMutatorKey = classOf[EdgeMutator].getName
+  val VertexMutatorKey = classOf[VertexMutator].getName
 
-  val DefaultConfig = ConfigFactory.parseMap(Map(MaxSizeKey -> 1000, TtlKey -> 
-1).asJava)
+  val DefaultMaxSize = 1000
+  val DefaultCacheTTL = -1
+  val DefaultConfig = ConfigFactory.parseMap(Map(MaxSizeKey -> DefaultMaxSize, 
TtlKey -> DefaultCacheTTL).asJava)
 }
 
 class ResourceManager(graph: S2GraphLike,
@@ -49,7 +59,10 @@ class ResourceManager(graph: S2GraphLike,
 
   import scala.collection.JavaConverters._
 
-  val cache = new SafeUpdateCache(_config)
+  val maxSize = 
Try(_config.getInt(ResourceManager.MaxSizeKey)).getOrElse(DefaultMaxSize)
+  val cacheTTL = 
Try(_config.getInt(ResourceManager.CacheTTL)).getOrElse(DefaultCacheTTL)
+
+  val cache = new SafeUpdateCache(maxSize, cacheTTL)
 
   def getAllVertexFetchers(): Seq[VertexFetcher] = {
     cache.asMap().asScala.toSeq.collect { case (_, (obj: VertexFetcher, _, _)) 
=> obj }
@@ -59,10 +72,22 @@ class ResourceManager(graph: S2GraphLike,
     cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) 
=> obj }
   }
 
+  def onEvict(oldValue: AnyRef): Unit = {
+    oldValue match {
+      case o: Option[_] => o.foreach {
+        case v: AutoCloseable => v.close()
+      }
+      case v: AutoCloseable => v.close()
+      case _ => logger.info(s"Class does't have close() method 
${oldValue.getClass.getName}")
+    }
+
+    logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
+  }
+
   def getOrElseUpdateVertexFetcher(column: ServiceColumn,
                                    cacheTTLInSecs: Option[Int] = None): 
Option[VertexFetcher] = {
     val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + 
column.columnName
-    cache.withCache(cacheKey, false, cacheTTLInSecs) {
+    cache.withCache(cacheKey, false, cacheTTLInSecs, onEvict) {
       column.toFetcherConfig.map { fetcherConfig =>
         val className = fetcherConfig.getString(ClassNameKey)
         val fetcher = Class.forName(className)
@@ -81,7 +106,7 @@ class ResourceManager(graph: S2GraphLike,
                                  cacheTTLInSecs: Option[Int] = None): 
Option[EdgeFetcher] = {
     val cacheKey = EdgeFetcherKey + "_" + label.label
 
-    cache.withCache(cacheKey, false, cacheTTLInSecs) {
+    cache.withCache(cacheKey, false, cacheTTLInSecs, onEvict) {
       label.toFetcherConfig.map { fetcherConfig =>
         val className = fetcherConfig.getString(ClassNameKey)
         val fetcher = Class.forName(className)
@@ -89,7 +114,10 @@ class ResourceManager(graph: S2GraphLike,
           .newInstance(graph)
           .asInstanceOf[EdgeFetcher]
 
-        fetcher.init(fetcherConfig)
+        fetcher.init(
+          fetcherConfig
+            .withValue("labelName", ConfigValueFactory.fromAnyRef(label.label))
+        )
 
         fetcher
       }
@@ -99,7 +127,7 @@ class ResourceManager(graph: S2GraphLike,
   def getOrElseUpdateVertexMutator(column: ServiceColumn,
                                    cacheTTLInSecs: Option[Int] = None): 
Option[VertexMutator] = {
     val cacheKey = VertexMutatorKey + "_" + column.service.serviceName + "_" + 
column.columnName
-    cache.withCache(cacheKey, false, cacheTTLInSecs) {
+    cache.withCache(cacheKey, false, cacheTTLInSecs, onEvict) {
       column.toMutatorConfig.map { mutatorConfig =>
         val className = mutatorConfig.getString(ClassNameKey)
         val fetcher = Class.forName(className)
@@ -117,7 +145,7 @@ class ResourceManager(graph: S2GraphLike,
   def getOrElseUpdateEdgeMutator(label: Label,
                                  cacheTTLInSecs: Option[Int] = None): 
Option[EdgeMutator] = {
     val cacheKey = EdgeMutatorKey + "_" + label.label
-    cache.withCache(cacheKey, false, cacheTTLInSecs) {
+    cache.withCache(cacheKey, false, cacheTTLInSecs, onEvict) {
       label.toMutatorConfig.map { mutatorConfig =>
         val className = mutatorConfig.getString(ClassNameKey)
         val fetcher = Class.forName(className)
@@ -125,7 +153,10 @@ class ResourceManager(graph: S2GraphLike,
           .newInstance(graph)
           .asInstanceOf[EdgeMutator]
 
-        fetcher.init(mutatorConfig)
+        fetcher.init(
+          mutatorConfig
+            .withValue("labelName", ConfigValueFactory.fromAnyRef(label.label))
+        )
 
         fetcher
       }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 651323f..2ff2eb3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -63,6 +63,8 @@ object S2Graph {
     "db.default.user" -> "graph",
     "cache.max.size" -> java.lang.Integer.valueOf(0),
     "cache.ttl.seconds" -> java.lang.Integer.valueOf(-1),
+    "resource.cache.max.size" -> java.lang.Integer.valueOf(1000),
+    "resource.cache.ttl.seconds" -> java.lang.Integer.valueOf(-1),
     "hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
     "hbase.rpcs.buffered_flush_interval" -> 
java.lang.Short.valueOf(100.toShort),
     "hbase.rpc.timeout" -> java.lang.Integer.valueOf(600000),
@@ -282,12 +284,12 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
   *  right implementation(S2GRAPH-213).
   * */
   override def getVertexFetcher(column: ServiceColumn): VertexFetcher = {
-    resourceManager.getOrElseUpdateVertexFetcher(column)
+    resourceManager.getOrElseUpdateVertexFetcher(column, Option(60 * 60 * 24))
       .getOrElse(defaultStorage.vertexFetcher)
   }
 
   override def getEdgeFetcher(label: Label): EdgeFetcher = {
-    resourceManager.getOrElseUpdateEdgeFetcher(label)
+    resourceManager.getOrElseUpdateEdgeFetcher(label, Option(60 * 60 * 24))
       .getOrElse(defaultStorage.edgeFetcher)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
index b641e7f..4addcab 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
@@ -24,7 +24,7 @@ import org.apache.s2graph.core.types.VertexId
 
 import scala.concurrent.{ExecutionContext, Future}
 
-trait VertexFetcher {
+trait VertexFetcher extends AutoCloseable {
   def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
 
   def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: 
ExecutionContext): Future[Seq[S2VertexLike]]

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
index d1c8ecf..a7c095a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
@@ -24,7 +24,7 @@ import org.apache.s2graph.core.storage.MutateResponse
 
 import scala.concurrent.{ExecutionContext, Future}
 
-trait VertexMutator {
+trait VertexMutator extends AutoCloseable {
   def close(): Unit = {}
 
   def init(config: Config)(implicit ec: ExecutionContext): Unit = {}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
new file mode 100644
index 0000000..927635e
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeFetcher.scala
@@ -0,0 +1,80 @@
+package org.apache.s2graph.core.storage.jdbc
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.types.VertexId
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
+import scalikejdbc.{ConnectionPool, ConnectionPoolSettings}
+
+import scala.concurrent.{ExecutionContext, Future}
+import scalikejdbc._
+
+class JdbcEdgeFetcher(graph: S2GraphLike) extends EdgeFetcher {
+
+  import JdbcStorage._
+
+  var selectQuery: SQLSyntax = _
+
+  override def init(config: Config)(implicit ec: ExecutionContext): Unit = {
+    import JdbcStorage._
+
+    val labelName = config.getString("labelName")
+    val label = Label.findByName(labelName, useCache = false).get
+
+    selectQuery = SQLSyntax.createUnsafely(
+      s"SELECT ${affectedColumns(label).mkString(", ")} FROM 
${labelTableName(label)}"
+    )
+
+    JdbcStorage.prepareConnection(config)
+    createTable(label)
+  }
+
+  def toEdge(metas: Seq[LabelMeta], labelName: String, direction: String, rs: 
Map[String, Any]): S2EdgeLike = {
+    val from = rs("_from".toUpperCase())
+    val to = rs("_to".toUpperCase())
+    val timestamp = 
rs("_timestamp".toUpperCase()).asInstanceOf[java.sql.Timestamp].millis
+
+    val props = metas.map { meta =>
+      meta.name -> rs(meta.name.toUpperCase())
+    }.toMap
+
+    if (direction == "out") {
+      graph.toEdge(from, to, labelName, direction, props, timestamp)
+    } else {
+      graph.toEdge(to, from, labelName, direction, props, timestamp)
+    }
+  }
+
+  override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: 
Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): 
Future[Seq[StepResult]] = {
+    withReadOnlySession { implicit session =>
+      val stepResultLs = queryRequests.map { queryRequest =>
+        val vertex = queryRequest.vertex
+        val queryParam = queryRequest.queryParam
+        val label = queryParam.label
+        val metas = label.metas()
+
+        val limit = queryParam.limit
+        val offset = queryParam.offset
+        val cond = vertex.innerId.toIdString()
+        val orderColumn = SQLSyntax.createUnsafely("_timestamp")
+
+        val rows = if (queryParam.direction == "out") {
+          sql"""${selectQuery} WHERE _from = ${cond} ORDER BY ${orderColumn} 
DESC offset ${offset} limit ${limit}""".map(Row.apply).list().apply()
+        } else {
+          sql"""${selectQuery} WHERE _to = ${cond} ORDER BY ${orderColumn} 
DESC offset ${offset} limit ${limit}""".map(Row.apply).list().apply()
+        }
+
+        val edgeWithScores = rows.map { row =>
+          val edge = toEdge(metas, label.label, queryParam.direction, row.row)
+          EdgeWithScore(edge, 1.0, queryParam.label)
+        }
+
+        StepResult(edgeWithScores, Nil, Nil)
+      }
+
+      Future.successful(stepResultLs)
+    }
+  }
+
+  override def fetchEdgesAll()(implicit ec: ExecutionContext): 
Future[Seq[S2EdgeLike]] = ???
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
new file mode 100644
index 0000000..96c214b
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcEdgeMutator.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.jdbc
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.storage.MutateResponse
+import org.joda.time.DateTime
+import scalikejdbc._
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class JdbcEdgeMutator(graph: S2GraphLike) extends EdgeMutator {
+
+  import JdbcStorage._
+
+  override def init(config: Config)(implicit ec: ExecutionContext): Unit = {
+    val labelName = config.getString("labelName")
+    val label = Label.findByName(labelName, useCache = false).get
+
+    JdbcStorage.prepareConnection(config)
+    createTable(label)
+  }
+
+  override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], 
withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+    _edges.groupBy(_.innerLabel).flatMap { case (label, edges) =>
+      val affectedColumns = JdbcStorage.affectedColumns(label)
+
+      val insertValues = edges.map { edge =>
+        val values = affectedColumns.collect {
+          case "_timestamp" => new DateTime(edge.ts)
+          case "_from" => edge.srcForVertex.innerId.value
+          case "_to" => edge.tgtForVertex.innerId.value
+          case k: String => edge.propertyValue(k).map(iv => 
iv.innerVal.value).orNull
+        }
+
+        values
+      }
+
+      val columns = affectedColumns.mkString(", ")
+      val table = JdbcStorage.labelTableName(label)
+      val prepared = affectedColumns.map(_ => "?").mkString(", ")
+
+      val conflictCheckKeys =
+        if (label.consistencyLevel == "strong") "(_from, _to)"
+        else "(_from, _to, _timestamp)"
+
+      val sqlRaw =
+        s"""MERGE INTO ${table} (${columns}) KEY ${conflictCheckKeys} VALUES 
(${prepared})""".stripMargin
+
+      val sql = SQLSyntax.createUnsafely(sqlRaw)
+      withTxSession { session =>
+        sql"""${sql}""".batch(insertValues: _*).apply()(session)
+      }
+
+      insertValues
+    }
+
+    Future.successful(_edges.map(_ => true))
+  }
+
+  override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], 
withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] 
= ???
+
+  override def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], 
withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] 
= ???
+
+  override def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: 
Long)(implicit ec: ExecutionContext): Future[MutateResponse] = ???
+
+  override def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, 
requestTs: Long, retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] 
= ???
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorage.scala
new file mode 100644
index 0000000..cf47e41
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorage.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.jdbc
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema._
+import org.apache.s2graph.core.storage.{Storage, StorageManagement, 
StorageSerDe}
+import org.apache.s2graph.core.utils.logger
+import scalikejdbc._
+
+object JdbcStorage {
+
+  case class Row(row: Map[String, Any])
+
+  object Row {
+    def apply(rs: WrappedResultSet): Row = Row(rs.toMap)
+  }
+
+  val PoolName = "_META_JDBC_STORAGE_"
+  val TablePrefix = "_EDGE_STORE"
+
+  def toMySqlType(tpe: String): String = tpe match {
+    case "int" | "integer" | "i" | "int32" | "integer32" => "int(32)"
+    case "string" | "str" | "s" => "varchar(256)"
+    case "boolean" | "bool" => "tinyint(1)"
+    case "long" | "l" | "int64" | "integer64" => "int(64)"
+    case "float64" | "float" | "f" | "float32" => "float"
+    case "double" | "d" => "double"
+    case _ => ""
+  }
+
+  def buildIndex(label: Label, indices: List[LabelIndex]): String = {
+    if (indices.isEmpty) s"KEY `${label.label}_PK` (`_timestamp`)" + ","
+    else {
+      val nameWithFields = indices.map { index =>
+        val name = index.name
+        val fields = index.propNames
+
+        name -> fields.toList.map(n => s"`${n}`")
+      }
+
+      val ret = nameWithFields.map { case (name, fields) =>
+        s"KEY `${label.label}_${name}` (${fields.mkString(", ")})"
+      }
+
+      ret.mkString(",\n") + ","
+    }
+  }
+
+  def buildProps(metas: List[LabelMeta]): String = {
+    if (metas.isEmpty) ""
+    else {
+      val ret = metas.map { meta =>
+        s"""`${meta.name}` ${toMySqlType(meta.dataType)}""".trim
+      }
+
+      (ret.mkString(",\n  ") + ",").trim
+    }
+  }
+
+  def showSchema(label: Label): String = {
+    val srcColumn = label.srcColumn
+    val tgtColumn = label.srcColumn
+    val metas = LabelMeta.findAllByLabelId(label.id.get, useCache = 
false).sortBy(_.name)
+    val consistency = label.consistencyLevel
+    val indices = label.indices
+    val isUnique = if (consistency == "strong") "UNIQUE" else ""
+
+    s"""
+       |CREATE TABLE `${labelTableName(label)}`(
+       |  `id` int(11) NOT NULL AUTO_INCREMENT,
+       |  `_timestamp` TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
+       |  `_from` ${toMySqlType(srcColumn.columnType)} NOT NULL,
+       |  `_to` ${toMySqlType(tgtColumn.columnType)} NOT NULL,
+       |  PRIMARY KEY (`id`),
+       |  ${buildProps(metas)}
+       |  ${buildIndex(label, indices)}
+       |  ${isUnique} KEY `${label.label}_from` (`_from`,`_to`),
+       |  ${isUnique} KEY `${label.label}_to` (`_to`,`_from`)
+       |) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+  """.trim.stripMargin
+  }
+
+  def labelTableName(label: Label): String = s"${TablePrefix}_${label.label}"
+
+  def affectedColumns(label: Label): Seq[String] = {
+    Seq("_timestamp", "_from", "_to") ++ 
LabelMeta.findAllByLabelId(label.id.get, useCache = 
false).sortBy(_.name).map(_.name)
+  }
+
+  def withTxSession[T](f: => DBSession => T): T = {
+    using(DB(ConnectionPool.borrow(PoolName))) { db =>
+      db localTx { implicit session =>
+        f(session)
+      }
+    }
+  }
+
+  def withReadOnlySession[T](f: => DBSession => T): T = {
+    using(DB(ConnectionPool.borrow(PoolName))) { db =>
+      f(db.readOnlySession())
+    }
+  }
+
+  def dropTable(label: Label): Unit = {
+    val cmd = s"DROP TABLE IF EXISTS ${labelTableName(label)}"
+
+    withTxSession { session =>
+      sql"${SQLSyntax.createUnsafely(cmd)}".execute().apply()(session)
+    }
+  }
+
+  def tables = 
DB(ConnectionPool.borrow(PoolName)).getTableNames().map(_.toLowerCase())
+
+  def createTable(label: Label): Boolean = {
+    val schema = showSchema(label)
+    val tableName = labelTableName(label)
+
+    if (tables.contains(tableName.toLowerCase())) {
+      logger.info(s"Table exists ${tableName}")
+    } else {
+      withTxSession { session =>
+        sql"${SQLSyntax.createUnsafely(schema)}".execute().apply()(session)
+      }
+    }
+
+    true
+  }
+
+  def prepareConnection(config: Config): Unit = {
+    val jdbcDriverName = config.getString("driver")
+    Class.forName(jdbcDriverName)
+
+    if (!ConnectionPool.isInitialized(PoolName)) {
+      val jdbcUrl = config.getString("url")
+      val user = config.getString("user")
+      val password = config.getString("password")
+      val settings = ConnectionPoolSettings(initialSize = 10, maxSize = 10, 
connectionTimeoutMillis = 30000L, validationQuery = "select 1;")
+
+      ConnectionPool.add(PoolName, jdbcUrl, user, password, settings)
+    }
+  }
+}
+
+class JdbcStorage(graph: S2GraphLike, config: Config) {
+
+  implicit val ec = graph.ec
+
+  JdbcStorage.prepareConnection(config)
+
+  val h2EdgeFetcher = new JdbcEdgeFetcher(graph)
+  h2EdgeFetcher.init(config)
+
+  val h2EdgeMutator = new JdbcEdgeMutator(graph)
+  h2EdgeMutator.init(config)
+
+  val edgeFetcher: EdgeFetcher = h2EdgeFetcher
+  val edgeMutator: EdgeMutator = h2EdgeMutator
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala
index 300106a..1a5cbde 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Importer.scala
@@ -24,8 +24,7 @@ import java.io.File
 import com.typesafe.config.Config
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike}
-import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{S2GraphLike}
 
 import scala.concurrent.{ExecutionContext, Future}
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index 3ecb02f..323ee9d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import com.google.common.cache.CacheBuilder
 import com.google.common.hash.Hashing
-import com.typesafe.config.Config
+import com.typesafe.config.{Config, ConfigFactory}
 
 import scala.collection.JavaConversions._
 import scala.concurrent.{ExecutionContext, Future}
@@ -85,8 +85,17 @@ class SafeUpdateCache(val config: Config)
 
   import java.lang.{Long => JLong}
   import SafeUpdateCache._
+
+
   val maxSize = config.getInt(SafeUpdateCache.MaxSizeKey)
   val systemTtl = config.getInt(SafeUpdateCache.TtlKey)
+
+  def this(maxSize: Int, systemTtl: Int)(implicit ec: ExecutionContext) {
+    this(ConfigFactory.parseMap(
+      Map(SafeUpdateCache.MaxSizeKey -> maxSize, SafeUpdateCache.TtlKey -> 
systemTtl))
+    )
+  }
+
   private val cache = CacheBuilder.newBuilder().maximumSize(maxSize)
     .build[JLong, (AnyRef, Int, AtomicBoolean)]()
 
@@ -114,9 +123,14 @@ class SafeUpdateCache(val config: Config)
     cache.invalidate(cacheKey)
   }
 
+  def defaultOnEvict(oldValue: AnyRef): Unit = {
+    logger.info(s"[SafeUpdateCache]: ${oldValue.getClass.getName} $oldValue is 
evicted.")
+  }
+
   def withCache[T <: AnyRef](key: String,
                              broadcast: Boolean,
-                             cacheTTLInSecs: Option[Int] = None)(op: => T): T 
= {
+                             cacheTTLInSecs: Option[Int] = None,
+                             onEvict: AnyRef => Unit = defaultOnEvict)(op: => 
T): T = {
     val cacheKey = toCacheKey(key)
     val cachedValWithTs = cache.getIfPresent(cacheKey)
 
@@ -145,6 +159,9 @@ class SafeUpdateCache(val config: Config)
               logger.error(s"withCache update failed: $cacheKey", ex)
             case Success(newValue) =>
               put(key, newValue, broadcast = (broadcast && newValue != 
cachedVal))
+
+              onEvict(cachedVal)
+
               logger.info(s"withCache update success: $cacheKey")
           }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/test/scala/org/apache/s2graph/core/fetcher/BaseFetcherTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/BaseFetcherTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/BaseFetcherTest.scala
index 1b71a4c..79021c8 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/BaseFetcherTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/BaseFetcherTest.scala
@@ -23,7 +23,7 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.rest.RequestParser
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.schema.{Label, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, 
ServiceColumn}
 import org.scalatest._
 
 import scala.concurrent.{Await, ExecutionContext}
@@ -67,7 +67,13 @@ trait BaseFetcherTest extends FunSuite with Matchers with 
BeforeAndAfterAll {
     val serviceColumn =
       management.createServiceColumn(serviceName, columnName, "string", Nil)
 
-    Label.findByName(labelName, useCache = false).foreach { label => 
Label.delete(label.id.get) }
+    Label.findByName(labelName, useCache = false).foreach { label =>
+      label.labelMetaSet.foreach { lm =>
+        LabelMeta.delete(lm.id.get)
+      }
+
+      Label.delete(label.id.get)
+    }
 
     val label = management.createLabel(
       labelName,
@@ -81,12 +87,13 @@ trait BaseFetcherTest extends FunSuite with Matchers with 
BeforeAndAfterAll {
       Seq.empty[Index],
       Seq(Prop(name = "score", defaultValue = "0.0", dataType = "double")),
       isDirected = true,
-      consistencyLevel =  "strong",
+      consistencyLevel = "strong",
       hTableName = None,
       hTableTTL = None,
       schemaVersion = "v3",
-      compressionAlgorithm =  "gz",
-      options = options
+      compressionAlgorithm = "gz",
+      options = options,
+      initFetcherWithOptions = true
     ).get
 
     management.updateEdgeFetcher(label, options)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
new file mode 100644
index 0000000..f06b2e6
--- /dev/null
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/jdbc/JdbcStorageTest.scala
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.fetcher.sql
+
+import org.apache.s2graph.core.Management.JsonModel._
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.fetcher.BaseFetcherTest
+import org.apache.s2graph.core.schema._
+import org.apache.s2graph.core.storage.jdbc._
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+class JdbcStorageTest extends BaseFetcherTest {
+  implicit lazy val ec = graph.ec
+
+  val edgeFetcherName = classOf[JdbcEdgeFetcher].getName
+  val edgeMutatorName = classOf[JdbcEdgeMutator].getName
+
+  val options =
+    s"""
+       |{
+       |  "fetcher": {
+       |    "className": "${edgeFetcherName}",
+       |    "url": "jdbc:h2:file:/tmp/s2graph/metastore;MODE=MYSQL",
+       |    "driver": "org.h2.Driver",
+       |    "password": "sa",
+       |    "user": "sa"
+       |  },
+       |  "mutator": {
+       |    "className": "${edgeMutatorName}",
+       |    "url": "jdbc:h2:file:/tmp/s2graph/metastore;MODE=MYSQL",
+       |    "driver": "org.h2.Driver",
+       |    "password": "sa",
+       |    "user": "sa"
+       |  }
+       |}
+       """.stripMargin
+
+  val serviceName = "s2graph"
+  val columnName = "user"
+
+  var service: Service = _
+  var serviceColumn: ServiceColumn = _
+
+  override def beforeAll: Unit = {
+    super.beforeAll
+
+    service = management.createService(serviceName, "localhost", 
"s2graph_htable", -1, None).get
+    serviceColumn = management.createServiceColumn(serviceName, columnName, 
"string", Nil)
+  }
+
+  def clearLabel(labelName: String): Unit = {
+    Label.findByName(labelName, useCache = false).foreach { label =>
+      label.labelMetaSet.foreach { lm =>
+        LabelMeta.delete(lm.id.get)
+      }
+      Label.delete(label.id.get)
+    }
+  }
+
+  def createLabel(labelName: String,
+                  consistencyLevel: String,
+                  props: Seq[Prop],
+                  indices: Seq[Index]): Label = {
+
+    clearLabel(labelName)
+
+    val label = management.createLabel(
+      labelName,
+      service.serviceName, serviceColumn.columnName, serviceColumn.columnType,
+      service.serviceName, serviceColumn.columnName, serviceColumn.columnType,
+      service.serviceName,
+      indices,
+      props,
+      isDirected = true, consistencyLevel = consistencyLevel, hTableName = 
None, hTableTTL = None,
+      schemaVersion = "v3", compressionAlgorithm = "gz", options = 
Option(options),
+      initFetcherWithOptions = true
+    ).get
+
+    management.updateEdgeFetcher(label, Option(options))
+    label
+  }
+
+  def normalise(s: String): String = 
s.split("\n").map(_.trim).filterNot(_.isEmpty).mkString("\n")
+
+  test("CreateLabel - label A: {strong, defaultIndex}") {
+    val props = Seq(
+      Prop(name = "score", defaultValue = "0.0", dataType = "double"),
+      Prop(name = "age", defaultValue = "0", dataType = "int")
+    )
+    val indices = Nil
+
+    val label = createLabel("A", "strong", props, indices)
+
+    val expectedColumn = Seq("_timestamp", "_from", "_to", "age", "score")
+    val actualColumn = JdbcStorage.affectedColumns(label)
+
+    actualColumn shouldBe expectedColumn
+
+    val expectedSchema =
+      """
+    CREATE TABLE `_EDGE_STORE_A`(
+      `id` int(11) NOT NULL AUTO_INCREMENT,
+      `_timestamp` TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
+      `_from` varchar(256) NOT NULL,
+      `_to` varchar(256) NOT NULL,
+      PRIMARY KEY (`id`),
+      `age` int(32),
+      `score` double,
+      KEY `A__PK` (`_timestamp`),
+      UNIQUE KEY `A_from` (`_from`,`_to`),
+      UNIQUE KEY `A_to` (`_to`,`_from`)
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+    """
+
+    val actualSchema = JdbcStorage.showSchema(label)
+    println(actualSchema)
+
+    normalise(actualSchema) shouldBe normalise(expectedSchema)
+  }
+
+  test("CreateLabel - label B: {strong, (age, score) index}") {
+    val props = Seq(
+      Prop(name = "score", defaultValue = "0.0", dataType = "double"),
+      Prop(name = "age", defaultValue = "0", dataType = "int")
+    )
+    val indices = Seq(
+      Index("idx_age_score", Seq("age", "score"))
+    )
+
+    val label = createLabel("B", "strong", props, indices)
+
+    val expectedColumn = Seq("_timestamp", "_from", "_to", "age", "score")
+    val actualColumn = JdbcStorage.affectedColumns(label)
+
+    actualColumn shouldBe expectedColumn
+
+    val expectedSchema =
+      """
+      CREATE TABLE `_EDGE_STORE_B`(
+        `id` int(11) NOT NULL AUTO_INCREMENT,
+        `_timestamp` TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
+        `_from` varchar(256) NOT NULL,
+        `_to` varchar(256) NOT NULL,
+        PRIMARY KEY (`id`),
+        `age` int(32),
+        `score` double,
+        KEY `B_idx_age_score` (`age`, `score`),
+        UNIQUE KEY `B_from` (`_from`,`_to`),
+        UNIQUE KEY `B_to` (`_to`,`_from`)
+      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+      """
+
+    val actualSchema = JdbcStorage.showSchema(label)
+    normalise(actualSchema) shouldBe normalise(expectedSchema)
+  }
+
+  test("CreateLabel - label C: {weak, defaultIndex}") {
+    val props = Seq(
+      Prop(name = "score", defaultValue = "0.0", dataType = "double"),
+      Prop(name = "age", defaultValue = "0", dataType = "int")
+    )
+    val indices = Nil
+
+    val label = createLabel("C", "weak", props, indices)
+
+    val expectedColumn = Seq("_timestamp", "_from", "_to", "age", "score")
+    val actualColumn = JdbcStorage.affectedColumns(label)
+
+    actualColumn shouldBe expectedColumn
+
+    val expectedSchema =
+      """
+      CREATE TABLE `_EDGE_STORE_C`(
+        `id` int(11) NOT NULL AUTO_INCREMENT,
+        `_timestamp` TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
+        `_from` varchar(256) NOT NULL,
+        `_to` varchar(256) NOT NULL,
+        PRIMARY KEY (`id`),
+        `age` int(32),
+        `score` double,
+        KEY `C__PK` (`_timestamp`),
+        KEY `C_from` (`_from`,`_to`),
+        KEY `C_to` (`_to`,`_from`)
+      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
+      """
+
+    val actualSchema = JdbcStorage.showSchema(label)
+    normalise(actualSchema) shouldBe normalise(expectedSchema)
+  }
+
+  test("Mutate and fetch edges - label A") {
+    val label = Label.findByName("A", useCache = false).get
+
+    JdbcStorage.dropTable(label)
+    JdbcStorage.createTable(label)
+
+    val fetcher = graph.getEdgeFetcher(label)
+    val mutator = graph.getEdgeMutator(label)
+
+    val edgeElric =
+      graph.toEdge("daewon", "elric", label.label, "out", Map("score" -> 90), 
ts = 10)
+
+    val edgeRain =
+      graph.toEdge("daewon", "rain", label.label, "out", Map("score" -> 50), 
ts = 5)
+
+    val edgeShon =
+      graph.toEdge("daewon", "shon", label.label, "out", Map("score" -> 100), 
ts = 15)
+
+    val edgeElricUpdated =
+      graph.toEdge("daewon", "elric", label.label, "out", Map("score" -> 200), 
ts = 20)
+
+    val insertEdges = Seq(edgeShon, edgeElric, edgeRain)
+    val updateEdges = Seq(edgeElricUpdated)
+
+    Await.ready(mutator.mutateStrongEdges("", insertEdges, true), Duration("10 
sec"))
+    Await.ready(mutator.mutateStrongEdges("", updateEdges, true), Duration("10 
sec"))
+
+    val qp = QueryParam(labelName = label.label, offset = 0, limit = 10)
+    val vertex = edgeElric.srcVertex
+    val step = Step(Seq(qp))
+    val q = Query(Seq(vertex), steps = Vector(step))
+    val qr = QueryRequest(q, 0, vertex, qp)
+
+    val fetchedEdges = Await.result(
+      fetcher.fetches(Seq(qr), Map.empty), Duration("10 
sec")).flatMap(_.edgeWithScores.map(_.edge)
+    )
+
+    fetchedEdges shouldBe Seq(edgeElricUpdated, edgeShon, edgeRain) // order 
by timestamp desc
+  }
+
+  test("Mutate and fetch edges - label C") {
+    val label = Label.findByName("C", useCache = false).get
+
+    JdbcStorage.dropTable(label)
+    JdbcStorage.createTable(label)
+
+    val fetcher = graph.getEdgeFetcher(label)
+    val mutator = graph.getEdgeMutator(label)
+
+    val edgeShon =
+      graph.toEdge("daewon", "shon", label.label, "out", Map("score" -> 90), 
ts = 10)
+
+    val edgeShon2 =
+      graph.toEdge("daewon", "shon", label.label, "out", Map("score" -> 50), 
ts = 5)
+
+    val insertEdges = Seq(edgeShon, edgeShon2)
+
+    Await.ready(mutator.mutateStrongEdges("", insertEdges, true), Duration("10 
sec"))
+
+    val qp = QueryParam(labelName = label.label, offset = 0, limit = 10, 
duplicatePolicy = DuplicatePolicy.Raw)
+    val vertex = edgeShon.srcVertex
+    val step = Step(Seq(qp))
+    val q = Query(Seq(vertex), steps = Vector(step))
+    val qr = QueryRequest(q, 0, vertex, qp)
+
+    val fetchedEdges = Await.result(
+      fetcher.fetches(Seq(qr), Map.empty), Duration("10 
sec")).flatMap(_.edgeWithScores.map(_.edge)
+    )
+
+    fetchedEdges shouldBe Seq(edgeShon, edgeShon2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3ddea3f7/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala
----------------------------------------------------------------------
diff --git 
a/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala 
b/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala
index 72412f5..ef01d52 100644
--- a/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala
+++ b/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala
@@ -77,7 +77,7 @@ trait TestGraph {
 }
 
 class EmptyGraph(config: Config) extends TestGraph {
-  Schema.apply(config)
+  org.apache.s2graph.core.schema.Schema.apply(config)
 
   lazy val graph = new 
S2Graph(config)(scala.concurrent.ExecutionContext.Implicits.global)
   lazy val management = new Management(graph)
@@ -92,7 +92,7 @@ class EmptyGraph(config: Config) extends TestGraph {
   override def repository: GraphRepository = s2Repository
 
   override def open(): Unit = {
-    Schema.shutdown(true)
+    org.apache.s2graph.core.schema.Schema.shutdown(true)
   }
 
 }
@@ -131,9 +131,10 @@ class BasicGraph(config: Config) extends 
EmptyGraph(config) {
       labelName,
       serviceName, columnName, "string",
       serviceName, columnName, "string",
-      true, serviceName,
+      serviceName,
       Nil,
       Seq(Prop("score", "0", "int")),
+      true,
       "strong"
     )
 }

Reply via email to