Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 33f4d0550 -> 33e3d267e


- abstract traversing edges as Fetcher interface.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/72c35a39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/72c35a39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/72c35a39

Branch: refs/heads/master
Commit: 72c35a39e9f739d6df941d86db546811c9cb8a2a
Parents: e674a25
Author: DO YUNG YOON <[email protected]>
Authored: Thu Apr 26 14:26:06 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Thu Apr 26 14:26:06 2018 +0900

----------------------------------------------------------------------
 s2core/build.sbt                                |   3 +-
 .../scala/org/apache/s2graph/core/Fetcher.scala |  17 +++
 .../org/apache/s2graph/core/Management.scala    |  15 ++-
 .../org/apache/s2graph/core/QueryResult.scala   |   4 +-
 .../scala/org/apache/s2graph/core/S2Graph.scala |  13 ++
 .../org/apache/s2graph/core/S2GraphLike.scala   |   7 +
 .../apache/s2graph/core/TraversalHelper.scala   |   4 +-
 .../s2graph/core/model/AnnoyModelFetcher.scala  |  87 ++++++++++++
 .../s2graph/core/model/ImportStatus.scala       |  40 ++++++
 .../apache/s2graph/core/model/Importer.scala    |  99 ++++++++++++++
 .../s2graph/core/model/MemoryModelFetcher.scala |  41 ++++++
 .../s2graph/core/model/ModelManager.scala       |  87 ++++++++++++
 .../org/apache/s2graph/core/schema/Label.scala  |  20 +--
 .../org/apache/s2graph/core/schema/Schema.scala |   4 +-
 .../apache/s2graph/core/schema/Service.scala    |   2 +-
 .../apache/s2graph/core/storage/Storage.scala   |  14 +-
 .../s2graph/core/storage/StorageReadable.scala  |  22 +--
 .../core/storage/hbase/AsynchbaseStorage.scala  |   9 +-
 .../core/storage/rocks/RocksStorage.scala       |   2 +-
 .../storage/rocks/RocksStorageReadable.scala    |   2 +-
 .../core/storage/serde/MutationHelper.scala     |   2 +-
 .../apache/s2graph/core/model/FetcherTest.scala | 134 +++++++++++++++++++
 .../s2graph/core/model/HDFSImporterTest.scala   |  80 +++++++++++
 23 files changed, 656 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index cc70e97..6e062cc 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -55,7 +55,8 @@ libraryDependencies ++= Seq(
   "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion 
excludeLogging(),
   "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion 
excludeLogging(),
   "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion 
excludeLogging(),
-  "org.scala-lang.modules" %% "scala-pickling" % "0.10.1"
+  "org.scala-lang.modules" %% "scala-pickling" % "0.10.1",
+  "com.spotify" % "annoy" % "0.2.5"
 )
 
 libraryDependencies := {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
new file mode 100644
index 0000000..737beb3
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
@@ -0,0 +1,17 @@
+package org.apache.s2graph.core
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.types.VertexId
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait Fetcher {
+
+  def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] =
+    Future.successful(this)
+
+  def fetches(queryRequests: Seq[QueryRequest],
+              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: 
ExecutionContext): Future[Seq[StepResult]]
+
+  def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index d026e5b..868fac7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -20,6 +20,7 @@
 package org.apache.s2graph.core
 
 import java.util
+import java.util.concurrent.Executors
 
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, 
LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException}
@@ -28,8 +29,10 @@ import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types.HBaseType._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.model.Importer
 import play.api.libs.json._
 
+import scala.concurrent.{ExecutionContext, Future}
 import scala.util.Try
 
 /**
@@ -70,7 +73,6 @@ object Management {
     case class Index(name: String, propNames: Seq[String], direction: 
Option[Int] = None, options: Option[String] = None)
   }
 
-
   def findService(serviceName: String) = {
     Service.findByName(serviceName, useCache = false)
   }
@@ -298,9 +300,18 @@ object Management {
 
 class Management(graph: S2GraphLike) {
 
+  val importEx = 
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
 
   import Management._
-  import scala.collection.JavaConversions._
+
+  def importModel(labelName: String): Future[Importer] = {
+    val label = Label.findByName(labelName).getOrElse(throw new 
LabelNotExistException(labelName))
+    val config = label.toFetcherConfig.getOrElse {
+      throw new IllegalArgumentException(s"${label.label} is not importable 
since there is no configuration on label.")
+    }
+
+    graph.modelManager.importModel(label, config)(importEx)
+  }
 
   def createStorageTable(zkAddr: String,
                   tableName: String,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index be57017..4a1018f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -237,7 +237,7 @@ object StepResult {
 
           //          val newOrderByValues = 
updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, 
newScore)
           val newOrderByValues =
-            if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.getTsInnerValValue(), None, None)
+            if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.getTs(), None, None)
             else toTuple4(newT.toValues(globalQueryOption.orderByKeys))
 
           val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys)
@@ -262,7 +262,7 @@ object StepResult {
 //            val newOrderByValues = 
updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, 
newScore)
 
             val newOrderByValues =
-              if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.getTsInnerValValue(), None, None)
+              if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.getTs(), None, None)
               else toTuple4(newT.toValues(globalQueryOption.orderByKeys))
 
             val newGroupByValues = 
newT.toValues(globalQueryOption.groupBy.keys)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 7816a63..43ab92c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -27,6 +27,7 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.commons.configuration.{BaseConfiguration, Configuration}
 import org.apache.s2graph.core.index.IndexProvider
 import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
+import org.apache.s2graph.core.model.ModelManager
 import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.storage.rocks.RocksStorage
@@ -186,6 +187,8 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
 
   override val management = new Management(this)
 
+  override val modelManager = new ModelManager(this)
+
   override val indexProvider = IndexProvider.apply(config)
 
   override val elementBuilder = new GraphElementBuilder(this)
@@ -247,6 +250,16 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends S2Grap
     storagePool.getOrElse(s"label:${label.label}", defaultStorage)
   }
 
+  //TODO:
+  override def getFetcher(column: ServiceColumn): Fetcher = {
+    getStorage(column.service).reader
+  }
+
+  override def getFetcher(label: Label): Fetcher = {
+    if (label.fetchConfigExist) modelManager.getFetcher(label)
+    else getStorage(label).reader
+  }
+
   override def flushStorage(): Unit = {
     storagePool.foreach { case (_, storage) =>
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index cbd31cc..fef0078 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -31,6 +31,7 @@ import 
org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
 import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
 import org.apache.s2graph.core.index.IndexProvider
+import org.apache.s2graph.core.model.ModelManager
 import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, 
ServiceColumn}
 import org.apache.s2graph.core.storage.{MutateResponse, Storage}
 import org.apache.s2graph.core.types.{InnerValLike, VertexId}
@@ -68,6 +69,8 @@ trait S2GraphLike extends Graph {
 
   val traversalHelper: TraversalHelper
 
+  val modelManager: ModelManager
+
   lazy val MaxRetryNum: Int = config.getInt("max.retry.number")
   lazy val MaxBackOff: Int = config.getInt("max.back.off")
   lazy val BackoffTimeout: Int = config.getInt("back.off.timeout")
@@ -90,6 +93,10 @@ trait S2GraphLike extends Graph {
 
   def getStorage(label: Label): Storage
 
+  def getFetcher(column: ServiceColumn): Fetcher
+
+  def getFetcher(label: Label): Fetcher
+
   def flushStorage(): Unit
 
   def shutdown(modelDataDelete: Boolean = false): Unit

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 0dc2aa2..003a2d1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -204,7 +204,7 @@ class TraversalHelper(graph: S2GraphLike) {
     val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, 
StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
       for {
         prev <- prevFuture
-        cur <- graph.getStorage(label).fetches(reqWithIdxs.map(_._1), 
prevStepEdges)
+        cur <- graph.getFetcher(label).fetches(reqWithIdxs.map(_._1), 
prevStepEdges)
       } yield {
         prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
       }
@@ -389,7 +389,7 @@ class TraversalHelper(graph: S2GraphLike) {
             val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
             /* OrderBy */
             val orderByValues =
-              if (queryOption.orderByKeys.isEmpty) (score, 
edge.getTsInnerValValue(), None, None)
+              if (queryOption.orderByKeys.isEmpty) (score, edge.getTs(), None, 
None)
               else 
StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
 
             /* StepGroupBy */

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
new file mode 100644
index 0000000..df083fa
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
@@ -0,0 +1,87 @@
+package org.apache.s2graph.core.model
+
+import java.io.File
+
+import com.spotify.annoy.{ANNIndex, IndexType}
+import com.typesafe.config.Config
+import org.apache.s2graph.core.types.VertexId
+import org.apache.s2graph.core._
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.io.Source
+import scala.util.Try
+
+object AnnoyModelFetcher {
+  val IndexFilePathKey = "annoyIndexFilePath"
+  val DictFilePathKey = "annoyDictFilePath"
+  val DimensionKey = "annoyIndexDimension"
+  val IndexTypeKey = "annoyIndexType"
+
+  def loadDictFromLocal(file: File): Array[String] = {
+    Source.fromFile(file).getLines().map { line =>
+      line.stripMargin
+    }.toArray
+  }
+
+  def buildIndex(config: Config): ANNIndexWithDict = {
+    val filePath = config.getString(IndexFilePathKey)
+    val dictPath = config.getString(DictFilePathKey)
+
+    val dimension = config.getInt(DimensionKey)
+    val indexType = Try { config.getString(IndexTypeKey) 
}.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR)
+
+    val dict = loadDictFromLocal(new File(dictPath))
+    val index = new ANNIndex(dimension, filePath, indexType)
+    ANNIndexWithDict(index, dict)
+  }
+}
+
+case class ANNIndexWithDict(index: ANNIndex, dict: Array[String]) {
+  val dictRev = dict.zipWithIndex.toMap
+}
+
+class AnnoyModelFetcher(val graph: S2GraphLike) extends Fetcher {
+  import scala.collection.JavaConverters._
+  val builder = graph.elementBuilder
+
+  var model: ANNIndexWithDict = _
+
+  override def init(config: Config)(implicit ec: ExecutionContext): 
Future[Fetcher] = {
+    Future {
+      model = AnnoyModelFetcher.buildIndex(config)
+
+      this
+    }
+  }
+
+  /** Fetch **/
+  override def fetches(queryRequests: Seq[QueryRequest],
+                       prevStepEdges: Map[VertexId, 
Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
+    val stepResultLs = queryRequests.map { queryRequest =>
+      val vertex = queryRequest.vertex
+      val queryParam = queryRequest.queryParam
+
+      val srcIndexOpt = model.dictRev.get(vertex.innerId.toIdString())
+
+      srcIndexOpt.map { srcIdx =>
+        val srcVector = model.index.getItemVector(srcIdx)
+        val nns = model.index.getNearest(srcVector, queryParam.limit).asScala
+
+        val edges = nns.map { tgtIdx =>
+          val tgtVertexId = builder.newVertexId(queryParam.label.service,
+            queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), 
model.dict(tgtIdx))
+
+          graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, 
queryParam.labelName, queryParam.direction)
+        }
+        val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, 
queryParam.label))
+        StepResult(edgeWithScores, Nil, Nil)
+      }.getOrElse(StepResult.Empty)
+    }
+
+    Future.successful(stepResultLs)
+  }
+
+  override def close(): Unit = {
+    // do clean up
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
new file mode 100644
index 0000000..63e8cdd
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
@@ -0,0 +1,40 @@
+package org.apache.s2graph.core.model
+
+import java.util.concurrent.atomic.AtomicInteger
+
+trait ImportStatus {
+  val done: AtomicInteger
+
+  def isCompleted: Boolean
+
+  def percentage: Int
+
+  val total: Int
+}
+
+class ImportRunningStatus(val total: Int) extends ImportStatus {
+  require(total > 0, s"Total should be positive: $total")
+
+  val done = new AtomicInteger(0)
+
+  def isCompleted: Boolean = total == done.get
+
+  def percentage = 100 * done.get / total
+}
+
+case object ImportDoneStatus extends ImportStatus {
+  val total = 1
+
+  val done = new AtomicInteger(1)
+
+  def isCompleted: Boolean = true
+
+  def percentage = 100
+}
+
+object ImportStatus {
+  def apply(total: Int): ImportStatus = new ImportRunningStatus(total)
+
+  def unapply(importResult: ImportStatus): Option[(Boolean, Int, Int)] =
+    Some((importResult.isCompleted, importResult.total, importResult.done.get))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
new file mode 100644
index 0000000..5265483
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
@@ -0,0 +1,99 @@
+package org.apache.s2graph.core.model
+
+import java.io.File
+
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.s2graph.core.{Fetcher, S2GraphLike}
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object Importer {
+  def toHDFSConfiguration(hdfsConfDir: String): Configuration = {
+    val conf = new Configuration
+
+    val hdfsConfDirectory = new File(hdfsConfDir)
+    if (hdfsConfDirectory.exists()) {
+      if (!hdfsConfDirectory.isDirectory || !hdfsConfDirectory.canRead) {
+        throw new IllegalStateException(s"HDFS configuration directory 
($hdfsConfDirectory) cannot be read.")
+      }
+
+      val path = hdfsConfDirectory.getAbsolutePath
+      conf.addResource(new Path(s"file:///$path/core-site.xml"))
+      conf.addResource(new Path(s"file:///$path/hdfs-site.xml"))
+    } else {
+      logger.warn("RocksDBImporter doesn't have valid hadoop configuration 
directory..")
+    }
+    conf
+  }
+}
+
+trait Importer {
+  @volatile var isFinished: Boolean = false
+  def run(config: Config)(implicit ec: ExecutionContext): Future[Importer]
+
+  def status: Boolean = isFinished
+
+  def setStatus(otherStatus: Boolean): Boolean = {
+    this.isFinished = otherStatus
+    this.isFinished
+  }
+//  def status: ImportStatus
+
+//  def getImportedStorage(graphExecutionContext: ExecutionContext): 
Storage[_, _]
+  def close(): Unit
+}
+case class IdentityImporter(graph: S2GraphLike) extends Importer {
+  override def run(config: Config)(implicit ec: ExecutionContext): 
Future[Importer] = {
+    Future.successful(this)
+  }
+
+  override def close(): Unit = {}
+}
+object HDFSImporter {
+  import scala.collection.JavaConverters._
+  val PathsKey = "paths"
+  val HDFSConfDirKey = "hdfsConfDir"
+
+  def extractPaths(config: Config): Map[String, String] = {
+    config.getConfigList(PathsKey).asScala.map { e =>
+      val key = e.getString("src")
+      val value = e.getString("tgt")
+
+      key -> value
+    }.toMap
+  }
+}
+case class HDFSImporter(graph: S2GraphLike) extends Importer {
+
+  import HDFSImporter._
+
+  override def run(config: Config)(implicit ec: ExecutionContext): 
Future[Importer] = {
+    Future {
+      val paths = extractPaths(config)
+      val hdfsConfiDir = config.getString(HDFSConfDirKey)
+
+      val hadoopConfig = Importer.toHDFSConfiguration(hdfsConfiDir)
+      val fs = FileSystem.get(hadoopConfig)
+
+      def copyToLocal(remoteSrc: String, localSrc: String): Unit = {
+        val remoteSrcPath = new Path(remoteSrc)
+        val localSrcPath = new Path(localSrc)
+
+        fs.copyToLocalFile(remoteSrcPath, localSrcPath)
+      }
+
+      paths.foreach { case (srcPath, tgtPath) =>
+        copyToLocal(srcPath, tgtPath)
+      }
+
+      this
+    }
+  }
+
+//  override def status: ImportStatus = ???
+
+  override def close(): Unit = {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
new file mode 100644
index 0000000..1b0474a
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
@@ -0,0 +1,41 @@
+package org.apache.s2graph.core.model
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.types.{InnerValLikeWithTs, VertexId}
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.LabelMeta
+
+import scala.concurrent.{ExecutionContext, Future}
+
+/**
+  * Reference implementation for Fetcher interface.
+  * it only produce constant edges.
+  */
+class MemoryModelFetcher(val graph: S2GraphLike) extends Fetcher {
+  val builder = graph.elementBuilder
+  val ranges = (0 until 10)
+
+  override def init(config: Config)(implicit ec: ExecutionContext): 
Future[Fetcher] = {
+    Future.successful(this)
+  }
+
+  override def fetches(queryRequests: Seq[QueryRequest],
+                       prevStepEdges: Map[VertexId, 
Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
+    val stepResultLs = queryRequests.map { queryRequest =>
+      val queryParam = queryRequest.queryParam
+      val edges = ranges.map { ith =>
+        val tgtVertexId = builder.newVertexId(queryParam.label.service, 
queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString)
+
+        graph.toEdge(queryRequest.vertex.innerIdVal,
+          tgtVertexId.innerId.value, queryParam.label.label, 
queryParam.direction)
+      }
+
+      val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, 
queryParam.label))
+      StepResult(edgeWithScores, Nil, Nil)
+    }
+
+    Future.successful(stepResultLs)
+  }
+
+  override def close(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
new file mode 100644
index 0000000..4afd3e3
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
@@ -0,0 +1,87 @@
+package org.apache.s2graph.core.model
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{Fetcher, S2GraphLike}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object ModelManager {
+  val FetcherClassNameKey = "fetchClassName"
+  val ImporterClassNameKey = "importerClassName"
+}
+
+class ModelManager(s2GraphLike: S2GraphLike) {
+  import ModelManager._
+
+  private val fetcherPool = scala.collection.mutable.Map.empty[String, Fetcher]
+  private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, 
Importer]
+
+  def toImportLockKey(label: Label): String = label.label
+
+  def getFetcher(label: Label): Fetcher = {
+    fetcherPool.getOrElse(toImportLockKey(label), throw new 
IllegalStateException(s"$label is not imported."))
+  }
+
+  def initImporter(config: Config): Importer = {
+    val className = config.getString(ImporterClassNameKey)
+
+    Class.forName(className)
+      .getConstructor(classOf[S2GraphLike])
+      .newInstance(s2GraphLike)
+      .asInstanceOf[Importer]
+  }
+
+  def initFetcher(config: Config)(implicit ec: ExecutionContext): 
Future[Fetcher] = {
+    val className = config.getString(FetcherClassNameKey)
+
+    val fetcher = Class.forName(className)
+        .getConstructor(classOf[S2GraphLike])
+      .newInstance(s2GraphLike)
+      .asInstanceOf[Fetcher]
+
+    fetcher.init(config)
+  }
+
+  def importModel(label: Label, config: Config)(implicit ec: 
ExecutionContext): Future[Importer] = {
+    val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new 
java.util.function.Function[String, Importer] {
+      override def apply(k: String): Importer = {
+        val importer = initImporter(config.getConfig("importer"))
+
+        //TODO: Update Label's extra options.
+        importer
+          .run(config.getConfig("importer"))
+          .map { importer =>
+            logger.info(s"Close importer")
+            importer.close()
+
+            initFetcher(config.getConfig("fetcher")).map { fetcher =>
+              importer.setStatus(true)
+
+
+              fetcherPool
+                .remove(k)
+                .foreach { oldFetcher =>
+                  logger.info(s"Delete old storage ($k) => $oldFetcher")
+                  oldFetcher.close()
+                }
+
+              fetcherPool += (k -> fetcher)
+              true
+            }
+
+            true
+          }
+          .onComplete { _ =>
+            logger.info(s"ImportLock release: $k")
+            ImportLock.remove(k)
+          }
+        importer
+      }
+    })
+
+    Future.successful(importer)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
index 7fb1183..c671381 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
@@ -369,19 +369,6 @@ case class Label(id: Option[Int], label: String,
     prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
     jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, 
schemaVersion), prop.dataType)
   } yield prop -> jsValue).toMap
-//  lazy val extraOptions = Model.extraOptions(Option("""{
-//    "storage": {
-//      "s2graph.storage.backend": "rocks",
-//      "rocks.db.path": "/tmp/db"
-//    }
-//  }"""))
-
-  lazy val tokens: Set[String] = 
extraOptions.get("tokens").fold(Set.empty[String]) {
-    case JsArray(tokens) => tokens.map(_.as[String]).toSet
-    case _ =>
-      logger.error("Invalid token JSON")
-      Set.empty[String]
-  }
 
   lazy val extraOptions = Schema.extraOptions(options)
 
@@ -389,8 +376,13 @@ case class Label(id: Option[Int], label: String,
 
   lazy val storageConfigOpt: Option[Config] = toStorageConfig
 
+  lazy val fetchConfigExist: Boolean = toFetcherConfig.isDefined
+
+  def toFetcherConfig: Option[Config] = {
+    Schema.toConfig(extraOptions, "fetcher")
+  }
   def toStorageConfig: Option[Config] = {
-    Schema.toStorageConfig(extraOptions)
+    Schema.toConfig(extraOptions, "storage")
   }
 
   def srcColumnWithDir(dir: Int) = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
index ebae966..255cd5a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
@@ -174,9 +174,9 @@ object Schema {
       }
   }
 
-  def toStorageConfig(options: Map[String, JsValue]): Option[Config] = {
+  def toConfig(options: Map[String, JsValue], key: String): Option[Config] = {
     try {
-      options.get("storage").map { jsValue =>
+      options.get(key).map { jsValue =>
         import scala.collection.JavaConverters._
         val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, 
value) =>
           key -> JSONParser.jsValueToAny(value).getOrElse(throw new 
RuntimeException("!!"))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
index 611a746..dbbfed7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
@@ -129,5 +129,5 @@ case class Service(id: Option[Int],
   lazy val extraOptions = Schema.extraOptions(options)
   lazy val storageConfigOpt: Option[Config] = toStorageConfig
   def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = 
ServiceColumn.findByServiceId(id.get, useCache = useCache)
-  def toStorageConfig: Option[Config] = Schema.toStorageConfig(extraOptions)
+  def toStorageConfig: Option[Config] = Schema.toConfig(extraOptions, 
"storage")
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 18f6b1e..6ad62b1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -40,7 +40,7 @@ abstract class Storage(val graph: S2GraphLike,
    * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage
    * then convert them into Edge/Vertex
    */
-  val fetcher: StorageReadable
+  val reader: StorageReadable
 
   /*
    * Serialize Edge/Vertex, to common KeyValue, SKeyValue that
@@ -60,7 +60,7 @@ abstract class Storage(val graph: S2GraphLike,
    * Note that it require storage backend specific implementations for
    * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO
    */
-  lazy val conflictResolver: WriteWriteConflictResolver = new 
WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher)
+  lazy val conflictResolver: WriteWriteConflictResolver = new 
WriteWriteConflictResolver(graph, serDe, io, mutator, reader)
 
   lazy val mutationHelper: MutationHelper = new MutationHelper(this)
 
@@ -74,17 +74,17 @@ abstract class Storage(val graph: S2GraphLike,
   /** Fetch **/
   def fetches(queryRequests: Seq[QueryRequest],
               prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: 
ExecutionContext): Future[Seq[StepResult]] =
-    fetcher.fetches(queryRequests, prevStepEdges)
+    reader.fetches(queryRequests, prevStepEdges)
 
   def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: 
ExecutionContext): Future[Seq[S2VertexLike]] =
-    fetcher.fetchVertices(vertices)
+    reader.fetchVertices(vertices)
 
-  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] 
= fetcher.fetchEdgesAll()
+  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] 
= reader.fetchEdgesAll()
 
-  def fetchVerticesAll()(implicit ec: ExecutionContext): 
Future[Seq[S2VertexLike]] = fetcher.fetchVerticesAll()
+  def fetchVerticesAll()(implicit ec: ExecutionContext): 
Future[Seq[S2VertexLike]] = reader.fetchVerticesAll()
 
   def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): 
Future[(Option[S2EdgeLike], Option[SKeyValue])] =
-    fetcher.fetchSnapshotEdgeInner(edge)
+    reader.fetchSnapshotEdgeInner(edge)
 
   /** Management **/
   def flush(): Unit = management.flush()

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index 0965f68..b10feb9 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -19,6 +19,7 @@
 
 package org.apache.s2graph.core.storage
 
+import com.typesafe.config.Config
 import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.types.VertexId
@@ -26,18 +27,18 @@ import org.apache.s2graph.core.utils.logger
 
 import scala.concurrent.{ExecutionContext, Future}
 
-trait StorageReadable {
+trait StorageReadable extends Fetcher {
   val io: StorageIO
   val serDe: StorageSerDe
- /**
-    * responsible to fire parallel fetch call into storage and create future 
that will return merged result.
-    *
-    * @param queryRequests
-    * @param prevStepEdges
-    * @return
-    */
-  def fetches(queryRequests: Seq[QueryRequest],
-              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: 
ExecutionContext): Future[Seq[StepResult]]
+// /**
+//    * responsible to fire parallel fetch call into storage and create future 
that will return merged result.
+//    *
+//    * @param queryRequests
+//    * @param prevStepEdges
+//    * @return
+//    */
+//  def fetches(queryRequests: Seq[QueryRequest],
+//              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: 
ExecutionContext): Future[Seq[StepResult]]
 
   def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
 
@@ -92,4 +93,5 @@ trait StorageReadable {
 
     Future.sequence(futures).map(_.flatten)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 8b3d862..e233277 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -155,13 +155,6 @@ class AsynchbaseStorage(override val graph: S2GraphLike,
 
   override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
 
-  override val fetcher: StorageReadable = new AsynchbaseStorageReadable(graph, 
config, client, serDe, io)
-
-  //  val hbaseExecutor: ExecutorService  =
-  //    if (config.getString("hbase.zookeeper.quorum") == "localhost")
-  //      AsynchbaseStorage.initLocalHBase(config)
-  //    else
-  //      null
-
+  override val reader: StorageReadable = new AsynchbaseStorageReadable(graph, 
config, client, serDe, io)
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
index 11fae17..e53aeb3 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
@@ -156,5 +156,5 @@ class RocksStorage(override val graph: S2GraphLike,
 
   override val serDe = new RocksStorageSerDe(graph)
 
-  override val fetcher = new RocksStorageReadable(graph, config, db, vdb, 
serDe, io)
+  override val reader = new RocksStorageReadable(graph, config, db, vdb, 
serDe, io)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
index 5db02cc..27e3efd 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
@@ -27,7 +27,7 @@ import org.apache.s2graph.core._
 import org.apache.s2graph.core.schema.{Label, ServiceColumn}
 import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, 
RocksRPC, ScanWithRange}
 import org.apache.s2graph.core.storage.serde.StorageSerializable
-import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, 
StorageSerDe}
+import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.types.{HBaseType, VertexId}
 import org.rocksdb.RocksDB
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
index 0748efb..fecc6ea 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
@@ -29,7 +29,7 @@ import scala.concurrent.{ExecutionContext, Future}
 class MutationHelper(storage: Storage) {
   val serDe = storage.serDe
   val io = storage.io
-  val fetcher = storage.fetcher
+  val fetcher = storage.reader
   val mutator = storage.mutator
   val conflictResolver = storage.conflictResolver
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
new file mode 100644
index 0000000..e89f8de
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala
@@ -0,0 +1,134 @@
+package org.apache.s2graph.core.model
+
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.Integrate.IntegrateCommon
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.{Query, QueryParam}
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext}
+
+class FetcherTest extends IntegrateCommon{
+  import TestUtil._
+  import scala.collection.JavaConverters._
+
+  test("MemoryModelFetcher") {
+    // 1. create label.
+    // 2. importLabel.
+    // 3. fetch.
+    val service = management.createService("s2graph", "localhost", 
"s2graph_htable", -1, None).get
+    val serviceColumn =
+      management.createServiceColumn("s2graph", "user", "string", 
Seq(Prop("age", "0", "int", true)))
+    val labelName = "fetcher_test"
+    val options = s"""{
+                     |
+                     | "importer": {
+                     |   "${ModelManager.ImporterClassNameKey}": 
"org.apache.s2graph.core.model.IdentityImporter"
+                     | },
+                     | "fetcher": {
+                     |   "${ModelManager.FetcherClassNameKey}": 
"org.apache.s2graph.core.model.MemoryModelFetcher"
+                     | }
+                     |}""".stripMargin
+
+    Label.findByName(labelName, useCache = false).foreach { label => 
Label.delete(label.id.get) }
+
+    val label = management.createLabel(
+      labelName,
+      serviceColumn,
+      serviceColumn,
+      true,
+      service.serviceName,
+      Seq.empty[Index].asJava,
+      Seq.empty[Prop].asJava,
+      "strong",
+      null,
+      -1,
+      "v3",
+      "gz",
+      options
+    )
+    val config = ConfigFactory.parseString(options)
+    val importerFuture = graph.modelManager.importModel(label, 
config)(ExecutionContext.Implicits.global)
+    Await.ready(importerFuture, Duration("60 seconds"))
+
+    Thread.sleep(1000)
+
+    val vertex = graph.elementBuilder.toVertex(service.serviceName, 
serviceColumn.columnName, "daewon")
+    val queryParam = QueryParam(labelName = labelName)
+
+    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = 
Seq(queryParam))
+    val stepResult = Await.result(graph.getEdges(query), Duration("60 
seconds"))
+
+    stepResult.edgeWithScores.foreach { es =>
+      println(es.edge)
+    }
+  }
+  test("AnnoyModelFetcher") {
+
+    val labelName = "annoy_model_fetcher_test"
+    val hdfsConfDir = "/usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop/"
+
+    val REMOTE_INDEX_FILE = "/Users/shon/Downloads/test-index.tree"
+    val LOCAL_INDEX_FILE = "./test-index.tree"
+    val REMOTE_DICT_FILE = "/Users/shon/Downloads/test-index.dict"
+    val LOCAL_DICT_FILE = "./test-index.dict"
+
+    val service = management.createService("s2graph", "localhost", 
"s2graph_htable", -1, None).get
+    val serviceColumn =
+      management.createServiceColumn("s2graph", "user", "string", 
Seq(Prop("age", "0", "int", true)))
+
+    val options = s"""{
+                     | "importer": {
+                     |   "${ModelManager.ImporterClassNameKey}": 
"org.apache.s2graph.core.model.HDFSImporter",
+                     |   "${HDFSImporter.HDFSConfDirKey}": "$hdfsConfDir",
+                     |   "${HDFSImporter.PathsKey}": [{
+                     |      "src": "${REMOTE_INDEX_FILE}",
+                     |      "tgt": "${LOCAL_INDEX_FILE}"
+                     |   }, {
+                     |      "src": "${REMOTE_DICT_FILE}",
+                     |      "tgt": "${LOCAL_DICT_FILE}"
+                     |   }]
+                     | },
+                     | "fetcher": {
+                     |   "${ModelManager.FetcherClassNameKey}": 
"org.apache.s2graph.core.model.AnnoyModelFetcher",
+                     |   "${AnnoyModelFetcher.IndexFilePathKey}": 
"${LOCAL_INDEX_FILE}",
+                     |   "${AnnoyModelFetcher.DictFilePathKey}": 
"${LOCAL_DICT_FILE}",
+                     |   "${AnnoyModelFetcher.DimensionKey}": 10
+                     | }
+                     |}""".stripMargin
+
+    Label.findByName(labelName, useCache = false).foreach { label => 
Label.delete(label.id.get) }
+
+    val label = management.createLabel(
+      labelName,
+      serviceColumn,
+      serviceColumn,
+      true,
+      service.serviceName,
+      Seq.empty[Index].asJava,
+      Seq.empty[Prop].asJava,
+      "strong",
+      null,
+      -1,
+      "v3",
+      "gz",
+      options
+    )
+    val config = ConfigFactory.parseString(options)
+    val importerFuture = graph.modelManager.importModel(label, 
config)(ExecutionContext.Implicits.global)
+    Await.result(importerFuture, Duration("3 minutes"))
+
+    Thread.sleep(10000)
+
+    val vertex = graph.elementBuilder.toVertex(service.serviceName, 
serviceColumn.columnName, "0")
+    val queryParam = QueryParam(labelName = labelName, limit = 5)
+
+    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = 
Seq(queryParam))
+    val stepResult = Await.result(graph.getEdges(query), Duration("60 
seconds"))
+
+    stepResult.edgeWithScores.foreach { es =>
+      println(es.edge)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala
new file mode 100644
index 0000000..385c9a7
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala
@@ -0,0 +1,80 @@
+package org.apache.s2graph.core.model
+
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.Integrate.IntegrateCommon
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.{Query, QueryParam}
+import org.apache.s2graph.core.schema.Label
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext}
+
+class HDFSImporterTest extends IntegrateCommon {
+  import scala.collection.JavaConverters._
+
+  test("hdfs test.") {
+
+    val labelName = "hdfs_importer_test"
+    val hdfsConfDir = "/usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop/"
+
+    val REMOTE_INDEX_FILE = "/Users/shon/Downloads/test-index.tree"
+    val LOCAL_INDEX_FILE = "./test-index.tree"
+    val REMOTE_DICT_FILE = "/Users/shon/Downloads/test-index.dict"
+    val LOCAL_DICT_FILE = "./test-index.dict"
+
+    val service = management.createService("s2graph", "localhost", 
"s2graph_htable", -1, None).get
+    val serviceColumn =
+      management.createServiceColumn("s2graph", "user", "string", 
Seq(Prop("age", "0", "int", true)))
+
+    val options = s"""{
+                     | "importer": {
+                     |   "${ModelManager.ImporterClassNameKey}": 
"org.apache.s2graph.core.model.HDFSImporter",
+                     |   "${HDFSImporter.HDFSConfDirKey}": "$hdfsConfDir",
+                     |   "${HDFSImporter.PathsKey}": [{
+                     |       "src":  "${REMOTE_INDEX_FILE}",
+                     |       "tgt": "${LOCAL_INDEX_FILE}"
+                     |     }, {
+                     |       "src": "${REMOTE_DICT_FILE}",
+                     |       "tgt": "${LOCAL_DICT_FILE}"
+                     |     }
+                     |   ]
+                     | },
+                     | "fetcher": {
+                     |   "${ModelManager.FetcherClassNameKey}": 
"org.apache.s2graph.core.model.MemoryModelFetcher"
+                     | }
+                     |}""".stripMargin
+
+    Label.findByName(labelName, useCache = false).foreach { label => 
Label.delete(label.id.get) }
+
+    val label = management.createLabel(
+      labelName,
+      serviceColumn,
+      serviceColumn,
+      true,
+      service.serviceName,
+      Seq.empty[Index].asJava,
+      Seq.empty[Prop].asJava,
+      "strong",
+      null,
+      -1,
+      "v3",
+      "gz",
+      options
+    )
+    val config = ConfigFactory.parseString(options)
+    val importerFuture = graph.modelManager.importModel(label, 
config)(ExecutionContext.Implicits.global)
+    Await.result(importerFuture, Duration("3 minutes"))
+
+    Thread.sleep(10000)
+
+    val vertex = graph.elementBuilder.toVertex(service.serviceName, 
serviceColumn.columnName, "0")
+    val queryParam = QueryParam(labelName = labelName, limit = 5)
+
+    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = 
Seq(queryParam))
+    val stepResult = Await.result(graph.getEdges(query), Duration("60 
seconds"))
+
+    stepResult.edgeWithScores.foreach { es =>
+      println(es.edge)
+    }
+  }
+}


Reply via email to