Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 7af37dbd3 -> 33f4d0550


- Add Fetcher/Mutator interface for query/mutation.
- Refactor Storage to use Fetcher/Mutator interface.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2357d810
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2357d810
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2357d810

Branch: refs/heads/master
Commit: 2357d810a6419011d4fd38af248089d9551a200c
Parents: 7af37db
Author: DO YUNG YOON <[email protected]>
Authored: Tue May 8 16:27:07 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue May 8 16:27:07 2018 +0900

----------------------------------------------------------------------
 project/Common.scala                            |   2 +
 s2core/build.sbt                                |   2 +-
 .../scala/org/apache/s2graph/core/Fetcher.scala |  36 ++++++
 .../org/apache/s2graph/core/Management.scala    |  15 ++-
 .../scala/org/apache/s2graph/core/Mutator.scala |  41 +++++++
 .../org/apache/s2graph/core/QueryResult.scala   |   4 +-
 .../scala/org/apache/s2graph/core/S2Graph.scala |  39 ++++--
 .../org/apache/s2graph/core/S2GraphLike.scala   |  11 ++
 .../apache/s2graph/core/TraversalHelper.scala   |   8 +-
 .../s2graph/core/model/ImportStatus.scala       |  59 +++++++++
 .../apache/s2graph/core/model/Importer.scala    | 122 +++++++++++++++++++
 .../s2graph/core/model/MemoryModelFetcher.scala |  59 +++++++++
 .../s2graph/core/model/ModelManager.scala       | 103 ++++++++++++++++
 .../org/apache/s2graph/core/schema/Label.scala  |  35 +++---
 .../org/apache/s2graph/core/schema/Schema.scala |   4 +-
 .../apache/s2graph/core/schema/Service.scala    |   2 +-
 .../apache/s2graph/core/storage/Storage.scala   |  45 +++----
 .../s2graph/core/storage/StorageReadable.scala  |  22 ++--
 .../s2graph/core/storage/StorageWritable.scala  |  19 +--
 .../storage/WriteWriteConflictResolver.scala    |   2 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  |  12 +-
 .../hbase/AsynchbaseStorageWritable.scala       |  11 +-
 .../core/storage/rocks/RocksStorage.scala       |  11 +-
 .../storage/rocks/RocksStorageReadable.scala    |   2 +-
 .../storage/rocks/RocksStorageWritable.scala    |  10 +-
 .../core/storage/serde/MutationHelper.scala     |  32 +++--
 .../apache/s2graph/core/model/FetcherTest.scala |  87 +++++++++++++
 .../apache/s2graph/graphql/GraphQLServer.scala  |  18 ++-
 .../org/apache/s2graph/graphql/HttpServer.scala |   2 +
 29 files changed, 695 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/project/Common.scala
----------------------------------------------------------------------
diff --git a/project/Common.scala b/project/Common.scala
index 96109d3..b46d190 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -33,6 +33,8 @@ object Common {
 
   val KafkaVersion = "0.10.2.1"
 
+  val rocksVersion = "5.11.3"
+
   /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging 
libraries to forward JCL and JUL logs to SLF4j */
   val loggingRuntime = Seq(
     "log4j" % "log4j" % "1.2.17",

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index cc70e97..12319d8 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -50,7 +50,7 @@ libraryDependencies ++= Seq(
   "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion ,
   "org.apache.lucene" % "lucene-core" % "6.6.0",
   "org.apache.lucene" % "lucene-queryparser" % "6.6.0",
-  "org.rocksdb" % "rocksdbjni" % "5.8.0",
+  "org.rocksdb" % "rocksdbjni" % rocksVersion,
   "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0",
   "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion 
excludeLogging(),
   "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion 
excludeLogging(),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
new file mode 100644
index 0000000..57d2f29
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.types.VertexId
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait Fetcher {
+
+  def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] =
+    Future.successful(this)
+
+  def fetches(queryRequests: Seq[QueryRequest],
+              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: 
ExecutionContext): Future[Seq[StepResult]]
+
+  def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index d026e5b..7ff5a9e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -20,6 +20,7 @@
 package org.apache.s2graph.core
 
 import java.util
+import java.util.concurrent.Executors
 
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, 
LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException}
@@ -28,8 +29,10 @@ import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types.HBaseType._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.model.Importer
 import play.api.libs.json._
 
+import scala.concurrent.{ExecutionContext, Future}
 import scala.util.Try
 
 /**
@@ -70,7 +73,6 @@ object Management {
     case class Index(name: String, propNames: Seq[String], direction: 
Option[Int] = None, options: Option[String] = None)
   }
 
-
   def findService(serviceName: String) = {
     Service.findByName(serviceName, useCache = false)
   }
@@ -298,9 +300,18 @@ object Management {
 
 class Management(graph: S2GraphLike) {
 
+  val importEx = 
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
 
   import Management._
-  import scala.collection.JavaConversions._
+
+  def importModel(labelName: String, options: String): Future[Importer] = {
+    Label.updateOption(labelName, options)
+
+    val label = Label.findByName(labelName, false).getOrElse(throw new 
LabelNotExistException(labelName))
+    val config = ConfigFactory.parseString(options)
+
+    graph.modelManager.importModel(label, config)(importEx)
+  }
 
   def createStorageTable(zkAddr: String,
                   tableName: String,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
new file mode 100644
index 0000000..53161e1
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait Mutator {
+  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: 
Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]]
+
+  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]]
+
+  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]]
+
+  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 
0)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+                                    requestTs: Long,
+                                    retryNum: Int)(implicit ec: 
ExecutionContext): Future[Boolean]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index be57017..4a1018f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -237,7 +237,7 @@ object StepResult {
 
           //          val newOrderByValues = 
updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, 
newScore)
           val newOrderByValues =
-            if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.getTsInnerValValue(), None, None)
+            if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.getTs(), None, None)
             else toTuple4(newT.toValues(globalQueryOption.orderByKeys))
 
           val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys)
@@ -262,7 +262,7 @@ object StepResult {
 //            val newOrderByValues = 
updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, 
newScore)
 
             val newOrderByValues =
-              if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.getTsInnerValValue(), None, None)
+              if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.getTs(), None, None)
               else toTuple4(newT.toValues(globalQueryOption.orderByKeys))
 
             val newGroupByValues = 
newT.toValues(globalQueryOption.groupBy.keys)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 7816a63..4b2274a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -27,6 +27,7 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.commons.configuration.{BaseConfiguration, Configuration}
 import org.apache.s2graph.core.index.IndexProvider
 import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
+import org.apache.s2graph.core.model.ModelManager
 import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.storage.rocks.RocksStorage
@@ -186,6 +187,8 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
 
   override val management = new Management(this)
 
+  override val modelManager = new ModelManager(this)
+
   override val indexProvider = IndexProvider.apply(config)
 
   override val elementBuilder = new GraphElementBuilder(this)
@@ -247,6 +250,25 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
     storagePool.getOrElse(s"label:${label.label}", defaultStorage)
   }
 
+  //TODO:
+  override def getFetcher(column: ServiceColumn): Fetcher = {
+    getStorage(column.service).reader
+  }
+
+  override def getFetcher(label: Label): Fetcher = {
+    if (label.fetchConfigExist) modelManager.getFetcher(label)
+    else getStorage(label).reader
+  }
+
+  override def getMutator(column: ServiceColumn): Mutator = {
+    getStorage(column.service).mutator
+  }
+
+  override def getMutator(label: Label): Mutator = {
+    getStorage(label).mutator
+  }
+
+  //TODO:
   override def flushStorage(): Unit = {
     storagePool.foreach { case (_, storage) =>
 
@@ -302,7 +324,7 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
     def mutateVertices(storage: Storage)(zkQuorum: String, vertices: 
Seq[S2VertexLike],
                                          withWait: Boolean = false): 
Future[Seq[MutateResponse]] = {
       val futures = vertices.map { vertex =>
-        storage.mutateVertex(zkQuorum, vertex, withWait)
+        getMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, 
withWait)
       }
       Future.sequence(futures)
     }
@@ -329,12 +351,12 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
 
     val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => 
edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
       val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, 
edgeGroup) =>
-        val storage = getStorage(label)
+        val mutator = getMutator(label)
         val edges = edgeGroup.map(_._1)
         val idxs = edgeGroup.map(_._2)
 
         /* multiple edges with weak consistency level will be processed as 
batch */
-        storage.mutateWeakEdges(zkQuorum, edges, withWait)
+        mutator.mutateWeakEdges(zkQuorum, edges, withWait)
       }
       Future.sequence(futures)
     }
@@ -347,9 +369,10 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
     val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => 
edge.innerLabel }.map { case (label, edgeGroup) =>
       val edges = edgeGroup.map(_._1)
       val idxs = edgeGroup.map(_._2)
-      val storage = getStorage(label)
+      val mutator = getMutator(label)
       val zkQuorum = label.hbaseZkAddr
-      storage.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
+
+      mutator.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
         idxs.zip(rets)
       }
     }
@@ -474,7 +497,7 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
   override def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): 
Future[Seq[MutateResponse]] = {
     val edgesWithIdx = edges.zipWithIndex
     val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { 
case (label, edgeGroup) =>
-      getStorage(label).incrementCounts(label.hbaseZkAddr, 
edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+      getMutator(label).incrementCounts(label.hbaseZkAddr, 
edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
     }
 
     Future.sequence(futures).map { ls =>
@@ -484,9 +507,9 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
 
   override def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): 
Future[MutateResponse] = {
     val label = edge.innerLabel
-    val storage = getStorage(label)
+    val mutator = getMutator(label)
 
-    storage.updateDegree(label.hbaseZkAddr, edge, degreeVal)
+    mutator.updateDegree(label.hbaseZkAddr, edge, degreeVal)
   }
 
   override def getVertex(vertexId: VertexId): Option[S2VertexLike] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index cbd31cc..6ed78b0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -31,6 +31,7 @@ import 
org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
 import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
 import org.apache.s2graph.core.index.IndexProvider
+import org.apache.s2graph.core.model.ModelManager
 import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, 
ServiceColumn}
 import org.apache.s2graph.core.storage.{MutateResponse, Storage}
 import org.apache.s2graph.core.types.{InnerValLike, VertexId}
@@ -68,6 +69,8 @@ trait S2GraphLike extends Graph {
 
   val traversalHelper: TraversalHelper
 
+  val modelManager: ModelManager
+
   lazy val MaxRetryNum: Int = config.getInt("max.retry.number")
   lazy val MaxBackOff: Int = config.getInt("max.back.off")
   lazy val BackoffTimeout: Int = config.getInt("back.off.timeout")
@@ -90,6 +93,14 @@ trait S2GraphLike extends Graph {
 
   def getStorage(label: Label): Storage
 
+  def getFetcher(column: ServiceColumn): Fetcher
+
+  def getFetcher(label: Label): Fetcher
+
+  def getMutator(label: Label): Mutator
+
+  def getMutator(column: ServiceColumn): Mutator
+
   def flushStorage(): Unit
 
   def shutdown(modelDataDelete: Boolean = false): Unit

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 0dc2aa2..d19dd1f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -204,7 +204,7 @@ class TraversalHelper(graph: S2GraphLike) {
     val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, 
StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
       for {
         prev <- prevFuture
-        cur <- graph.getStorage(label).fetches(reqWithIdxs.map(_._1), 
prevStepEdges)
+        cur <- graph.getFetcher(label).fetches(reqWithIdxs.map(_._1), 
prevStepEdges)
       } yield {
         prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
       }
@@ -256,7 +256,7 @@ class TraversalHelper(graph: S2GraphLike) {
               */
             graph.mutateEdges(edgesToDelete.map(_.edge), withWait = 
true).map(_.forall(_.isSuccess))
           } else {
-            graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, 
requestTs, MaxRetryNum)
+            graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, 
requestTs, MaxRetryNum)
           }
         case _ =>
 
@@ -264,7 +264,7 @@ class TraversalHelper(graph: S2GraphLike) {
             * read: x
             * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x 
indices)
             */
-          graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, 
requestTs, MaxRetryNum)
+          graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, 
requestTs, MaxRetryNum)
       }
       ret
     }
@@ -389,7 +389,7 @@ class TraversalHelper(graph: S2GraphLike) {
             val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
             /* OrderBy */
             val orderByValues =
-              if (queryOption.orderByKeys.isEmpty) (score, 
edge.getTsInnerValValue(), None, None)
+              if (queryOption.orderByKeys.isEmpty) (score, edge.getTs(), None, 
None)
               else 
StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
 
             /* StepGroupBy */

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
new file mode 100644
index 0000000..189a6d0
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.model
+
+import java.util.concurrent.atomic.AtomicInteger
+
+trait ImportStatus {
+  val done: AtomicInteger
+
+  def isCompleted: Boolean
+
+  def percentage: Int
+
+  val total: Int
+}
+
+class ImportRunningStatus(val total: Int) extends ImportStatus {
+  require(total > 0, s"Total should be positive: $total")
+
+  val done = new AtomicInteger(0)
+
+  def isCompleted: Boolean = total == done.get
+
+  def percentage = 100 * done.get / total
+}
+
+case object ImportDoneStatus extends ImportStatus {
+  val total = 1
+
+  val done = new AtomicInteger(1)
+
+  def isCompleted: Boolean = true
+
+  def percentage = 100
+}
+
+object ImportStatus {
+  def apply(total: Int): ImportStatus = new ImportRunningStatus(total)
+
+  def unapply(importResult: ImportStatus): Option[(Boolean, Int, Int)] =
+    Some((importResult.isCompleted, importResult.total, importResult.done.get))
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
new file mode 100644
index 0000000..e3084dd
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.model
+
+import java.io.File
+
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.s2graph.core.{Fetcher, S2GraphLike}
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object Importer {
+  def toHDFSConfiguration(hdfsConfDir: String): Configuration = {
+    val conf = new Configuration
+
+    val hdfsConfDirectory = new File(hdfsConfDir)
+    if (hdfsConfDirectory.exists()) {
+      if (!hdfsConfDirectory.isDirectory || !hdfsConfDirectory.canRead) {
+        throw new IllegalStateException(s"HDFS configuration directory 
($hdfsConfDirectory) cannot be read.")
+      }
+
+      val path = hdfsConfDirectory.getAbsolutePath
+      conf.addResource(new Path(s"file:///$path/core-site.xml"))
+      conf.addResource(new Path(s"file:///$path/hdfs-site.xml"))
+    } else {
+      logger.warn("RocksDBImporter doesn't have valid hadoop configuration 
directory..")
+    }
+    conf
+  }
+}
+
+trait Importer {
+  @volatile var isFinished: Boolean = false
+
+  def run(config: Config)(implicit ec: ExecutionContext): Future[Importer]
+
+  def status: Boolean = isFinished
+
+  def setStatus(otherStatus: Boolean): Boolean = {
+    this.isFinished = otherStatus
+    this.isFinished
+  }
+
+  def close(): Unit
+}
+
+case class IdentityImporter(graph: S2GraphLike) extends Importer {
+  override def run(config: Config)(implicit ec: ExecutionContext): 
Future[Importer] = {
+    Future.successful(this)
+  }
+
+  override def close(): Unit = {}
+}
+
+object HDFSImporter {
+
+  import scala.collection.JavaConverters._
+
+  val PathsKey = "paths"
+  val HDFSConfDirKey = "hdfsConfDir"
+
+  def extractPaths(config: Config): Map[String, String] = {
+    config.getConfigList(PathsKey).asScala.map { e =>
+      val key = e.getString("src")
+      val value = e.getString("tgt")
+
+      key -> value
+    }.toMap
+  }
+}
+
+case class HDFSImporter(graph: S2GraphLike) extends Importer {
+
+  import HDFSImporter._
+
+  override def run(config: Config)(implicit ec: ExecutionContext): 
Future[Importer] = {
+    Future {
+      val paths = extractPaths(config)
+      val hdfsConfiDir = config.getString(HDFSConfDirKey)
+
+      val hadoopConfig = Importer.toHDFSConfiguration(hdfsConfiDir)
+      val fs = FileSystem.get(hadoopConfig)
+
+      def copyToLocal(remoteSrc: String, localSrc: String): Unit = {
+        val remoteSrcPath = new Path(remoteSrc)
+        val localSrcPath = new Path(localSrc)
+
+        fs.copyToLocalFile(remoteSrcPath, localSrcPath)
+      }
+
+      paths.foreach { case (srcPath, tgtPath) =>
+        copyToLocal(srcPath, tgtPath)
+      }
+
+      this
+    }
+  }
+
+  //  override def status: ImportStatus = ???
+
+  override def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
new file mode 100644
index 0000000..2130066
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.model
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.types.VertexId
+
+import scala.concurrent.{ExecutionContext, Future}
+
+/**
+  * Reference implementation for Fetcher interface.
+  * it only produce constant edges.
+  */
+class MemoryModelFetcher(val graph: S2GraphLike) extends Fetcher {
+  val builder = graph.elementBuilder
+  val ranges = (0 until 10)
+
+  override def init(config: Config)(implicit ec: ExecutionContext): 
Future[Fetcher] = {
+    Future.successful(this)
+  }
+
+  override def fetches(queryRequests: Seq[QueryRequest],
+                       prevStepEdges: Map[VertexId, 
Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
+    val stepResultLs = queryRequests.map { queryRequest =>
+      val queryParam = queryRequest.queryParam
+      val edges = ranges.map { ith =>
+        val tgtVertexId = builder.newVertexId(queryParam.label.service, 
queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString)
+
+        graph.toEdge(queryRequest.vertex.innerIdVal,
+          tgtVertexId.innerId.value, queryParam.label.label, 
queryParam.direction)
+      }
+
+      val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, 
queryParam.label))
+      StepResult(edgeWithScores, Nil, Nil)
+    }
+
+    Future.successful(stepResultLs)
+  }
+
+  override def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
new file mode 100644
index 0000000..3cad13c
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.model
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{Fetcher, S2GraphLike}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object ModelManager {
+  val ClassNameKey = "className"
+}
+
+class ModelManager(s2GraphLike: S2GraphLike) {
+
+  import ModelManager._
+
+  private val fetcherPool = scala.collection.mutable.Map.empty[String, Fetcher]
+  private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, 
Importer]
+
+  def toImportLockKey(label: Label): String = label.label
+
+  def getFetcher(label: Label): Fetcher = {
+    fetcherPool.getOrElse(toImportLockKey(label), throw new 
IllegalStateException(s"$label is not imported."))
+  }
+
+  def initImporter(config: Config): Importer = {
+    val className = config.getString(ClassNameKey)
+
+    Class.forName(className)
+      .getConstructor(classOf[S2GraphLike])
+      .newInstance(s2GraphLike)
+      .asInstanceOf[Importer]
+  }
+
+  def initFetcher(config: Config)(implicit ec: ExecutionContext): 
Future[Fetcher] = {
+    val className = config.getString(ClassNameKey)
+
+    val fetcher = Class.forName(className)
+      .getConstructor(classOf[S2GraphLike])
+      .newInstance(s2GraphLike)
+      .asInstanceOf[Fetcher]
+
+    fetcher.init(config)
+  }
+
+  def importModel(label: Label, config: Config)(implicit ec: 
ExecutionContext): Future[Importer] = {
+    val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new 
java.util.function.Function[String, Importer] {
+      override def apply(k: String): Importer = {
+        val importer = initImporter(config.getConfig("importer"))
+
+        //TODO: Update Label's extra options.
+        importer
+          .run(config.getConfig("importer"))
+          .map { importer =>
+            logger.info(s"Close importer")
+            importer.close()
+
+            initFetcher(config.getConfig("fetcher")).map { fetcher =>
+              importer.setStatus(true)
+
+              fetcherPool
+                .remove(k)
+                .foreach { oldFetcher =>
+                  logger.info(s"Delete old storage ($k) => $oldFetcher")
+                  oldFetcher.close()
+                }
+
+              fetcherPool += (k -> fetcher)
+            }
+          }
+          .onComplete { _ =>
+            logger.info(s"ImportLock release: $k")
+            ImportLock.remove(k)
+          }
+
+        importer
+      }
+    })
+
+    Future.successful(importer)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
index 7fb1183..cca1769 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
@@ -259,6 +259,20 @@ object Label extends SQLSyntaxSupport[Label] {
     cnt
   }
 
+  def updateOption(labelName: String, options: String)(implicit session: 
DBSession = AutoSession) = {
+    scala.util.Try(Json.parse(options)).getOrElse(throw new 
RuntimeException("invalid Json option"))
+    logger.info(s"update options of label $labelName, ${options}")
+    val cnt = sql"""update labels set options = $options where label = 
$labelName""".update().apply()
+    val label = Label.findByName(labelName, useCache = false).get
+
+    val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}")
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+    cnt
+  }
+
   def delete(id: Int)(implicit session: DBSession = AutoSession) = {
     val label = findById(id)
     logger.info(s"delete label: $label")
@@ -369,19 +383,6 @@ case class Label(id: Option[Int], label: String,
     prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
     jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, 
schemaVersion), prop.dataType)
   } yield prop -> jsValue).toMap
-//  lazy val extraOptions = Model.extraOptions(Option("""{
-//    "storage": {
-//      "s2graph.storage.backend": "rocks",
-//      "rocks.db.path": "/tmp/db"
-//    }
-//  }"""))
-
-  lazy val tokens: Set[String] = 
extraOptions.get("tokens").fold(Set.empty[String]) {
-    case JsArray(tokens) => tokens.map(_.as[String]).toSet
-    case _ =>
-      logger.error("Invalid token JSON")
-      Set.empty[String]
-  }
 
   lazy val extraOptions = Schema.extraOptions(options)
 
@@ -389,8 +390,14 @@ case class Label(id: Option[Int], label: String,
 
   lazy val storageConfigOpt: Option[Config] = toStorageConfig
 
+  lazy val fetchConfigExist: Boolean = toFetcherConfig.isDefined
+
+  def toFetcherConfig: Option[Config] = {
+    Schema.toConfig(extraOptions, "fetcher")
+  }
+
   def toStorageConfig: Option[Config] = {
-    Schema.toStorageConfig(extraOptions)
+    Schema.toConfig(extraOptions, "storage")
   }
 
   def srcColumnWithDir(dir: Int) = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
index c28df80..50c1b7f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
@@ -175,9 +175,9 @@ object Schema {
       }
   }
 
-  def toStorageConfig(options: Map[String, JsValue]): Option[Config] = {
+  def toConfig(options: Map[String, JsValue], key: String): Option[Config] = {
     try {
-      options.get("storage").map { jsValue =>
+      options.get(key).map { jsValue =>
         import scala.collection.JavaConverters._
         val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, 
value) =>
           key -> JSONParser.jsValueToAny(value).getOrElse(throw new 
RuntimeException("!!"))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
index 611a746..dbbfed7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
@@ -129,5 +129,5 @@ case class Service(id: Option[Int],
   lazy val extraOptions = Schema.extraOptions(options)
   lazy val storageConfigOpt: Option[Config] = toStorageConfig
   def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = 
ServiceColumn.findByServiceId(id.get, useCache = useCache)
-  def toStorageConfig: Option[Config] = Schema.toStorageConfig(extraOptions)
+  def toStorageConfig: Option[Config] = Schema.toConfig(extraOptions, 
"storage")
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 18f6b1e..d2500a6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.storage
 
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.serde.{Deserializable, MutationHelper}
+import org.apache.s2graph.core.storage.serde.Deserializable
 import 
org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable
 import org.apache.s2graph.core.types._
 
@@ -33,14 +33,11 @@ abstract class Storage(val graph: S2GraphLike,
   /* Storage backend specific resource management */
   val management: StorageManagement
 
-  /* Physically store given KeyValue into backend storage. */
-  val mutator: StorageWritable
-
   /*
    * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage
    * then convert them into Edge/Vertex
    */
-  val fetcher: StorageReadable
+  val reader: StorageReadable
 
   /*
    * Serialize Edge/Vertex, to common KeyValue, SKeyValue that
@@ -50,6 +47,11 @@ abstract class Storage(val graph: S2GraphLike,
   val serDe: StorageSerDe
 
   /*
+   * Responsible to connect physical storage backend to store 
GraphElement(Edge/Vertex).
+   */
+  val mutator: Mutator
+
+  /*
    * Common helper to translate SKeyValue to Edge/Vertex and vice versa.
    * Note that it require storage backend specific implementation for 
serialize/deserialize.
    */
@@ -60,31 +62,24 @@ abstract class Storage(val graph: S2GraphLike,
    * Note that it require storage backend specific implementations for
    * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO
    */
-  lazy val conflictResolver: WriteWriteConflictResolver = new 
WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher)
-
-  lazy val mutationHelper: MutationHelper = new MutationHelper(this)
-
-  /** Mutation **/
-  def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutator.writeToStorage(cluster, kvs, withWait)
+//  lazy val conflictResolver: WriteWriteConflictResolver = new 
WriteWriteConflictResolver(graph, serDe, io, mutator, reader)
+//  lazy val mutationHelper: MutationHelper = new MutationHelper(this)
 
-  def writeLock(requestKeyValue: SKeyValue, expectedOpt: 
Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutator.writeLock(requestKeyValue, expectedOpt)
 
   /** Fetch **/
   def fetches(queryRequests: Seq[QueryRequest],
               prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: 
ExecutionContext): Future[Seq[StepResult]] =
-    fetcher.fetches(queryRequests, prevStepEdges)
+    reader.fetches(queryRequests, prevStepEdges)
 
   def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: 
ExecutionContext): Future[Seq[S2VertexLike]] =
-    fetcher.fetchVertices(vertices)
+    reader.fetchVertices(vertices)
 
-  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] 
= fetcher.fetchEdgesAll()
+  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] 
= reader.fetchEdgesAll()
 
-  def fetchVerticesAll()(implicit ec: ExecutionContext): 
Future[Seq[S2VertexLike]] = fetcher.fetchVerticesAll()
+  def fetchVerticesAll()(implicit ec: ExecutionContext): 
Future[Seq[S2VertexLike]] = reader.fetchVerticesAll()
 
   def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): 
Future[(Option[S2EdgeLike], Option[SKeyValue])] =
-    fetcher.fetchSnapshotEdgeInner(edge)
+    reader.fetchSnapshotEdgeInner(edge)
 
   /** Management **/
   def flush(): Unit = management.flush()
@@ -102,21 +97,21 @@ abstract class Storage(val graph: S2GraphLike,
   def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
                                     requestTs: Long,
                                     retryNum: Int)(implicit ec: 
ExecutionContext): Future[Boolean] =
-    mutationHelper.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, 
retryNum)
+    mutator.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum)
 
   def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: 
Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutationHelper.mutateVertex(zkQuorum: String, vertex, withWait)
+    mutator.mutateVertex(zkQuorum: String, vertex, withWait)
 
   def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] =
-    mutationHelper.mutateStrongEdges(zkQuorum, _edges, withWait)
+    mutator.mutateStrongEdges(zkQuorum, _edges, withWait)
 
 
   def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] =
-    mutationHelper.mutateWeakEdges(zkQuorum, _edges, withWait)
+    mutator.mutateWeakEdges(zkQuorum, _edges, withWait)
 
   def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] =
-    mutationHelper.incrementCounts(zkQuorum, edges, withWait)
+    mutator.incrementCounts(zkQuorum, edges, withWait)
 
   def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 
0)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutationHelper.updateDegree(zkQuorum, edge, degreeVal)
+    mutator.updateDegree(zkQuorum, edge, degreeVal)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index 0965f68..b10feb9 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -19,6 +19,7 @@
 
 package org.apache.s2graph.core.storage
 
+import com.typesafe.config.Config
 import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.types.VertexId
@@ -26,18 +27,18 @@ import org.apache.s2graph.core.utils.logger
 
 import scala.concurrent.{ExecutionContext, Future}
 
-trait StorageReadable {
+trait StorageReadable extends Fetcher {
   val io: StorageIO
   val serDe: StorageSerDe
- /**
-    * responsible to fire parallel fetch call into storage and create future 
that will return merged result.
-    *
-    * @param queryRequests
-    * @param prevStepEdges
-    * @return
-    */
-  def fetches(queryRequests: Seq[QueryRequest],
-              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: 
ExecutionContext): Future[Seq[StepResult]]
+// /**
+//    * responsible to fire parallel fetch call into storage and create future 
that will return merged result.
+//    *
+//    * @param queryRequests
+//    * @param prevStepEdges
+//    * @return
+//    */
+//  def fetches(queryRequests: Seq[QueryRequest],
+//              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: 
ExecutionContext): Future[Seq[StepResult]]
 
   def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
 
@@ -92,4 +93,5 @@ trait StorageReadable {
 
     Future.sequence(futures).map(_.flatten)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
index 80da3a9..8c2fb27 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
@@ -19,20 +19,21 @@
 
 package org.apache.s2graph.core.storage
 
+import org.apache.s2graph.core.Mutator
+
 import scala.concurrent.{ExecutionContext, Future}
 
-trait StorageWritable {
+trait OptimisticMutator extends Mutator {
   /**
     * decide how to store given key values Seq[SKeyValue] into storage using 
storage's client.
     * note that this should be return true on all success.
     * we assumes that each storage implementation has client as member 
variable.
     *
-    *
-    * @param cluster: where this key values should be stored.
-    * @param kvs: sequence of SKeyValue that need to be stored in storage.
-    * @param withWait: flag to control wait ack from storage.
-    *                  note that in AsynchbaseStorage(which support 
asynchronous operations), even with true,
-    *                  it never block thread, but rather submit work and 
notified by event loop when storage send ack back.
+    * @param cluster  : where this key values should be stored.
+    * @param kvs      : sequence of SKeyValue that need to be stored in 
storage.
+    * @param withWait : flag to control wait ack from storage.
+    *                 note that in AsynchbaseStorage(which support 
asynchronous operations), even with true,
+    *                 it never block thread, but rather submit work and 
notified by event loop when storage send ack back.
     * @return ack message from storage.
     */
   def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
@@ -55,10 +56,10 @@ trait StorageWritable {
     * for storage that does not support concurrency control, then storage 
implementation
     * itself can maintain manual locks that synchronize 
read(fetchSnapshotEdgeKeyValues)
     * and write(writeLock).
+    *
     * @param requestKeyValue
     * @param expectedOpt
     * @return
     */
   def writeLock(requestKeyValue: SKeyValue, expectedOpt: 
Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse]
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
index dcef1cc..bfc5bc6 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
@@ -31,7 +31,7 @@ import scala.util.Random
 class WriteWriteConflictResolver(graph: S2GraphLike,
                                  serDe: StorageSerDe,
                                  io: StorageIO,
-                                 mutator: StorageWritable,
+                                 mutator: OptimisticMutator,
                                  fetcher: StorageReadable) {
   val BackoffTimeout = graph.BackoffTimeout
   val MaxRetryNum = graph.MaxRetryNum

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 8b3d862..4be3767 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -151,17 +151,9 @@ class AsynchbaseStorage(override val graph: S2GraphLike,
 
   override val management: StorageManagement = new 
AsynchbaseStorageManagement(config, clients)
 
-  override val mutator: StorageWritable = new 
AsynchbaseStorageWritable(client, clientWithFlush)
-
   override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
 
-  override val fetcher: StorageReadable = new AsynchbaseStorageReadable(graph, 
config, client, serDe, io)
-
-  //  val hbaseExecutor: ExecutorService  =
-  //    if (config.getString("hbase.zookeeper.quorum") == "localhost")
-  //      AsynchbaseStorage.initLocalHBase(config)
-  //    else
-  //      null
-
+  override val reader: StorageReadable = new AsynchbaseStorageReadable(graph, 
config, client, serDe, io)
 
+  override val mutator: Mutator = new AsynchbaseStorageWritable(graph, serDe, 
reader, client, clientWithFlush)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
index 7ca3782..b4236b9 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
@@ -20,14 +20,19 @@
 package org.apache.s2graph.core.storage.hbase
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.storage.{IncrementResponse, MutateResponse, 
SKeyValue, StorageWritable}
+import org.apache.s2graph.core.S2GraphLike
+import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.utils.{Extensions, logger}
 import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, 
PutRequest}
+
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{ExecutionContext, Future}
 
-class AsynchbaseStorageWritable(val client: HBaseClient,
-                                val clientWithFlush: HBaseClient) extends 
StorageWritable {
+class AsynchbaseStorageWritable(val graph: S2GraphLike,
+                                val serDe: StorageSerDe,
+                                val reader: StorageReadable,
+                                val client: HBaseClient,
+                                val clientWithFlush: HBaseClient) extends 
DefaultOptimisticMutator(graph, serDe, reader) {
   import Extensions.DeferOps
 
   private def client(withWait: Boolean): HBaseClient = if (withWait) 
clientWithFlush else client

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
index 11fae17..b24e375 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
@@ -26,7 +26,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, 
LoadingCache}
 import com.google.common.hash.Hashing
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.Storage
+import org.apache.s2graph.core.storage.{Storage, StorageManagement, 
StorageReadable, StorageSerDe}
 import org.apache.s2graph.core.storage.rocks.RocksHelper.RocksRPC
 import org.apache.s2graph.core.utils.logger
 import org.rocksdb._
@@ -150,11 +150,12 @@ class RocksStorage(override val graph: S2GraphLike,
     .maximumSize(1000 * 10 * 10 * 10 * 10)
     .build[String, ReentrantLock](cacheLoader)
 
-  override val management = new RocksStorageManagement(config, vdb, db)
+  override val management: StorageManagement = new 
RocksStorageManagement(config, vdb, db)
 
-  override val mutator = new RocksStorageWritable(db, vdb, lockMap)
+  override val serDe: StorageSerDe = new RocksStorageSerDe(graph)
 
-  override val serDe = new RocksStorageSerDe(graph)
+  override val reader: StorageReadable = new RocksStorageReadable(graph, 
config, db, vdb, serDe, io)
+
+  override val mutator: Mutator = new RocksStorageWritable(graph, serDe, 
reader, db, vdb, lockMap)
 
-  override val fetcher = new RocksStorageReadable(graph, config, db, vdb, 
serDe, io)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
index 5db02cc..27e3efd 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
@@ -27,7 +27,7 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.schema.{Label, ServiceColumn}
 import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, 
RocksRPC, ScanWithRange}
 import org.apache.s2graph.core.storage.serde.StorageSerializable
-import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, 
StorageSerDe}
+import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.types.{HBaseType, VertexId}
 import org.rocksdb.RocksDB
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
index 7ec147d..d29ccce 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
@@ -23,15 +23,19 @@ import java.util.concurrent.locks.ReentrantLock
 
 import com.google.common.cache.{Cache, LoadingCache}
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue, 
StorageWritable}
+import org.apache.s2graph.core.S2GraphLike
+import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.utils.logger
 import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions}
 
 import scala.concurrent.{ExecutionContext, Future}
 
-class RocksStorageWritable(val db: RocksDB,
+class RocksStorageWritable(val graph: S2GraphLike,
+                           val serDe: StorageSerDe,
+                           val reader: StorageReadable,
+                           val db: RocksDB,
                            val vdb: RocksDB,
-                           val lockMap: LoadingCache[String, ReentrantLock]) 
extends StorageWritable {
+                           val lockMap: LoadingCache[String, ReentrantLock]) 
extends DefaultOptimisticMutator(graph, serDe, reader) {
 
   override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean)(implicit ec: ExecutionContext) = {
     if (kvs.isEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
index 0748efb..8cd32d4 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
@@ -16,25 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.s2graph.core.storage.serde
-
-import org.apache.s2graph.core.schema.LabelMeta
+package org.apache.s2graph.core.storage
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.utils.logger
 
 import scala.concurrent.{ExecutionContext, Future}
 
-class MutationHelper(storage: Storage) {
-  val serDe = storage.serDe
-  val io = storage.io
-  val fetcher = storage.fetcher
-  val mutator = storage.mutator
-  val conflictResolver = storage.conflictResolver
+abstract class DefaultOptimisticMutator(graph: S2GraphLike,
+                                        serDe: StorageSerDe,
+                                        reader: StorageReadable) extends 
OptimisticMutator {
+  val fetcher = reader
+
+  lazy val io: StorageIO = new StorageIO(graph, serDe)
+  lazy val conflictResolver: WriteWriteConflictResolver = new 
WriteWriteConflictResolver(graph, serDe, io, this, reader)
 
-  private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
-    mutator.writeToStorage(cluster, kvs, withWait)
+//  private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+//    mutator.writeToStorage(cluster, kvs, withWait)
 
   def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
                                     requestTs: Long,
@@ -93,7 +91,7 @@ class MutationHelper(storage: Storage) {
 
       val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
 
-      if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, 
withWait = false)
+      if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = 
false)
       io.buildVertexPutsAsync(edge) ++ 
io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ 
io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
     }
 
@@ -152,7 +150,7 @@ class MutationHelper(storage: Storage) {
       }
 
       val composed = for {
-      //        deleteRet <- Future.sequence(deleteAllFutures)
+        //        deleteRet <- Future.sequence(deleteAllFutures)
         mutateRet <- Future.sequence(mutateEdgeFutures)
       } yield mutateRet
 
@@ -185,6 +183,6 @@ class MutationHelper(storage: Storage) {
   def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 
0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
     val kvs = io.buildDegreePuts(edge, degreeVal)
 
-    mutator.writeToStorage(zkQuorum, kvs, withWait = true)
+    writeToStorage(zkQuorum, kvs, withWait = true)
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
new file mode 100644
index 0000000..6c76cdf
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.model
+
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.Integrate.IntegrateCommon
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.{Query, QueryParam}
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext}
+
+class FetcherTest extends IntegrateCommon {
+
+  import scala.collection.JavaConverters._
+
+  test("MemoryModelFetcher") {
+    // 1. create label.
+    // 2. importLabel.
+    // 3. fetch.
+    val service = management.createService("s2graph", "localhost", 
"s2graph_htable", -1, None).get
+    val serviceColumn =
+      management.createServiceColumn("s2graph", "user", "string", 
Seq(Prop("age", "0", "int", true)))
+    val labelName = "fetcher_test"
+    val options =
+      s"""{
+         |
+                     | "importer": {
+         |   "${ModelManager.ClassNameKey}": 
"org.apache.s2graph.core.model.IdentityImporter"
+         | },
+         | "fetcher": {
+         |   "${ModelManager.ClassNameKey}": 
"org.apache.s2graph.core.model.MemoryModelFetcher"
+         | }
+         |}""".stripMargin
+
+    Label.findByName(labelName, useCache = false).foreach { label => 
Label.delete(label.id.get) }
+
+    val label = management.createLabel(
+      labelName,
+      serviceColumn,
+      serviceColumn,
+      true,
+      service.serviceName,
+      Seq.empty[Index].asJava,
+      Seq.empty[Prop].asJava,
+      "strong",
+      null,
+      -1,
+      "v3",
+      "gz",
+      options
+    )
+    val config = ConfigFactory.parseString(options)
+    val importerFuture = graph.modelManager.importModel(label, 
config)(ExecutionContext.Implicits.global)
+    Await.ready(importerFuture, Duration("60 seconds"))
+
+    Thread.sleep(1000)
+
+    val vertex = graph.elementBuilder.toVertex(service.serviceName, 
serviceColumn.columnName, "daewon")
+    val queryParam = QueryParam(labelName = labelName)
+
+    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = 
Seq(queryParam))
+    val stepResult = Await.result(graph.getEdges(query), Duration("60 
seconds"))
+
+    stepResult.edgeWithScores.foreach { es =>
+      println(es.edge)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
----------------------------------------------------------------------
diff --git 
a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala 
b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index 391a99f..0bf62d9 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -37,7 +37,7 @@ import sangria.execution.deferred.DeferredResolver
 import sangria.marshalling.sprayJson._
 import sangria.parser.QueryParser
 import sangria.schema.Schema
-import spray.json.{JsObject, JsString}
+import spray.json.{JsBoolean, JsObject, JsString}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.ExecutionContext
@@ -63,8 +63,22 @@ object GraphQLServer {
 
   val schemaCache = new SafeUpdateCache(schemaConfig)
 
-  def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): 
Route = {
+  def importModel(requestJSON: spray.json.JsValue)(implicit e: 
ExecutionContext): Route = {
+    val ret = Try {
+      val spray.json.JsObject(fields) = requestJSON
+      val spray.json.JsString(labelName) = fields("label")
+      val jsOptions = fields("options")
+
+      s2graph.management.importModel(labelName, jsOptions.compactPrint)
+    }
 
+    ret match {
+      case Success(f) => complete(f.map(i => OK -> JsString("start")))
+      case Failure(e) => complete(InternalServerError -> 
spray.json.JsObject("message" -> JsString(e.toString)))
+    }
+  }
+
+  def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): 
Route = {
     val spray.json.JsObject(fields) = requestJSON
     val spray.json.JsString(query) = fields("query")
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
----------------------------------------------------------------------
diff --git 
a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala 
b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
index 685e87b..38cdce3 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
@@ -44,6 +44,8 @@ object Server extends App {
 
   val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) {
     entity(as[spray.json.JsValue])(GraphQLServer.endpoint)
+  } ~ (post & path("importModel")) {
+    entity(as[spray.json.JsValue])(GraphQLServer.importModel)
   } ~ {
     getFromResource("assets/graphiql.html")
   }

Reply via email to