http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
index 628c5e1..796e52e 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
@@ -21,10 +21,12 @@ package org.apache.s2graph.core.storage.rocks
 
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
-import org.apache.s2graph.core.types.VertexId
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.{HBaseType, VertexId}
 import org.rocksdb.RocksDB
 
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{ExecutionContext, Future}
 
 class RocksEdgeFetcher(val graph: S2GraphLike,
@@ -57,4 +59,33 @@ class RocksEdgeFetcher(val graph: S2GraphLike,
 
     Future.sequence(futures)
   }
+
+  override def fetchEdgesAll()(implicit ec: ExecutionContext) = {
+    val edges = new ArrayBuffer[S2EdgeLike]()
+    Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case 
(hTableName, labels) =>
+      val distinctLabels = labels.toSet
+
+      val iter = db.newIterator()
+      try {
+        iter.seekToFirst()
+        while (iter.isValid) {
+          val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, 
iter.value, System.currentTimeMillis())
+
+          serDe.indexEdgeDeserializer(schemaVer = 
HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
+            .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == 
"out" && !e.isDegree)
+            .foreach { edge =>
+              edges += edge
+            }
+
+
+          iter.next()
+        }
+
+      } finally {
+        iter.close()
+      }
+    }
+
+    Future.successful(edges)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
index 8948e13..aaf9086 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
@@ -238,16 +238,13 @@ class RocksStorage(override val graph: S2GraphLike,
 
   private lazy val optimisticEdgeFetcher = new 
RocksOptimisticEdgeFetcher(graph, config, db, vdb, serDe, io)
   private lazy val optimisticMutator = new RocksOptimisticMutator(graph, 
serDe, optimisticEdgeFetcher, db, vdb, lockMap)
-  private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, 
optimisticEdgeFetcher, optimisticMutator)
 
   override val management: StorageManagement = new 
RocksStorageManagement(config, vdb, db)
   override val serDe: StorageSerDe = new RocksStorageSerDe(graph)
 
   override val edgeFetcher: EdgeFetcher = new RocksEdgeFetcher(graph, config, 
db, vdb, serDe, io)
-  override val edgeBulkFetcher: EdgeBulkFetcher = new 
RocksEdgeBulkFetcher(graph, config, db, vdb, serDe, io)
   override val vertexFetcher: VertexFetcher = new RocksVertexFetcher(graph, 
config, db, vdb, serDe, io)
-  override val vertexBulkFetcher: VertexBulkFetcher = new 
RocksVertexBulkFetcher(graph, config, db, vdb, serDe, io)
 
-  override val edgeMutator: EdgeMutator = _mutator
-  override val vertexMutator: VertexMutator = _mutator
+  override val edgeMutator: EdgeMutator = new 
DefaultOptimisticEdgeMutator(graph, serDe, optimisticEdgeFetcher, 
optimisticMutator, io)
+  override val vertexMutator: VertexMutator = new 
DefaultOptimisticVertexMutator(graph, serDe, optimisticEdgeFetcher, 
optimisticMutator, io)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala
deleted file mode 100644
index 20acfaa..0000000
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.rocks
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.schema.ServiceColumn
-import org.apache.s2graph.core.{S2GraphLike, S2VertexLike, VertexBulkFetcher}
-import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
-import org.apache.s2graph.core.types.HBaseType
-import org.rocksdb.RocksDB
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
-
-class RocksVertexBulkFetcher(val graph: S2GraphLike,
-                             val config: Config,
-                             val db: RocksDB,
-                             val vdb: RocksDB,
-                             val serDe: StorageSerDe,
-                             val io: StorageIO) extends VertexBulkFetcher {
-  import RocksStorage._
-
-  override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
-    import scala.collection.mutable
-
-    val vertices = new ArrayBuffer[S2VertexLike]()
-    ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case 
(hTableName, columns) =>
-      val distinctColumns = columns.toSet
-
-      val iter = vdb.newIterator()
-      val buffer = mutable.ListBuffer.empty[SKeyValue]
-      var oldVertexIdBytes = Array.empty[Byte]
-      var minusPos = 0
-
-      try {
-        iter.seekToFirst()
-        while (iter.isValid) {
-          val row = iter.key()
-          if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - 
minusPos, row, 0, row.length - 1)) {
-            if (buffer.nonEmpty)
-              serDe.vertexDeserializer(schemaVer = 
HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
-                .filter(v => distinctColumns(v.serviceColumn))
-                .foreach { vertex =>
-                  vertices += vertex
-                }
-
-            oldVertexIdBytes = row
-            minusPos = 1
-            buffer.clear()
-          }
-          val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, 
iter.value(), System.currentTimeMillis())
-          buffer += kv
-
-          iter.next()
-        }
-        if (buffer.nonEmpty)
-          serDe.vertexDeserializer(schemaVer = 
HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
-            .filter(v => distinctColumns(v.serviceColumn))
-            .foreach { vertex =>
-              vertices += vertex
-            }
-
-      } finally {
-        iter.close()
-      }
-    }
-
-    Future.successful(vertices)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
index 6becd98..2d3880c 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala
@@ -20,10 +20,15 @@
 package org.apache.s2graph.core.storage.rocks
 
 import com.typesafe.config.Config
+import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.storage.rocks.RocksStorage.{qualifier, table}
 import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
 import org.rocksdb.RocksDB
 
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{ExecutionContext, Future}
 
 class RocksVertexFetcher(val graph: S2GraphLike,
@@ -58,4 +63,52 @@ class RocksVertexFetcher(val graph: S2GraphLike,
 
     Future.sequence(futures).map(_.flatten)
   }
+
+  override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
+    import scala.collection.mutable
+
+    val vertices = new ArrayBuffer[S2VertexLike]()
+    ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case 
(hTableName, columns) =>
+      val distinctColumns = columns.toSet
+
+      val iter = vdb.newIterator()
+      val buffer = mutable.ListBuffer.empty[SKeyValue]
+      var oldVertexIdBytes = Array.empty[Byte]
+      var minusPos = 0
+
+      try {
+        iter.seekToFirst()
+        while (iter.isValid) {
+          val row = iter.key()
+          if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - 
minusPos, row, 0, row.length - 1)) {
+            if (buffer.nonEmpty)
+              serDe.vertexDeserializer(schemaVer = 
HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
+                .filter(v => distinctColumns(v.serviceColumn))
+                .foreach { vertex =>
+                  vertices += vertex
+                }
+
+            oldVertexIdBytes = row
+            minusPos = 1
+            buffer.clear()
+          }
+          val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, 
iter.value(), System.currentTimeMillis())
+          buffer += kv
+
+          iter.next()
+        }
+        if (buffer.nonEmpty)
+          serDe.vertexDeserializer(schemaVer = 
HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
+            .filter(v => distinctColumns(v.serviceColumn))
+            .foreach { vertex =>
+              vertices += vertex
+            }
+
+      } finally {
+        iter.close()
+      }
+    }
+
+    Future.successful(vertices)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index 7d15078..51e45c2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -114,7 +114,9 @@ class SafeUpdateCache(val config: Config)
     cache.invalidate(cacheKey)
   }
 
-  def withCache[T <: AnyRef](key: String, broadcast: Boolean)(op: => T): T = {
+  def withCache[T <: AnyRef](key: String,
+                             broadcast: Boolean,
+                             forceUpdate: Boolean = false)(op: => T): T = {
     val cacheKey = toCacheKey(key)
     val cachedValWithTs = cache.getIfPresent(cacheKey)
 
@@ -127,7 +129,7 @@ class SafeUpdateCache(val config: Config)
       val (_cachedVal, updatedAt, isUpdating) = cachedValWithTs
       val cachedVal = _cachedVal.asInstanceOf[T]
 
-      if (toTs() < updatedAt + ttl) cachedVal // in cache TTL
+      if (!forceUpdate && toTs() < updatedAt + ttl) cachedVal // in cache TTL
       else {
         val running = isUpdating.getAndSet(true)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala
index 6d95c93..930c517 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala
@@ -23,7 +23,7 @@ import com.typesafe.config.ConfigFactory
 import org.apache.s2graph.core.Integrate.IntegrateCommon
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.{Query, QueryParam}
+import org.apache.s2graph.core.{Query, QueryParam, ResourceManager}
 
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContext}
@@ -44,10 +44,10 @@ class EdgeFetcherTest extends IntegrateCommon {
       s"""{
          |
                      | "importer": {
-         |   "${FetcherManager.ClassNameKey}": 
"org.apache.s2graph.core.utils.IdentityImporter"
+         |   "${ResourceManager.ClassNameKey}": 
"org.apache.s2graph.core.utils.IdentityImporter"
          | },
          | "fetcher": {
-         |   "${FetcherManager.ClassNameKey}": 
"org.apache.s2graph.core.fetcher.MemoryModelFetcher"
+         |   "${ResourceManager.ClassNameKey}": 
"org.apache.s2graph.core.fetcher.MemoryModelEdgeFetcher"
          | }
          |}""".stripMargin
 
@@ -68,11 +68,9 @@ class EdgeFetcherTest extends IntegrateCommon {
       "gz",
       options
     )
-    val config = ConfigFactory.parseString(options)
-    val importerFuture = graph.modelManager.importModel(label, 
config)(ExecutionContext.Implicits.global)
-    Await.ready(importerFuture, Duration("60 seconds"))
 
-    Thread.sleep(1000)
+    graph.management.updateEdgeFetcher(label, options)
+
 
     val vertex = graph.elementBuilder.toVertex(service.serviceName, 
serviceColumn.columnName, "daewon")
     val queryParam = QueryParam(labelName = labelName)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
----------------------------------------------------------------------
diff --git 
a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala 
b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index 0bf62d9..eee7c93 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -40,7 +40,7 @@ import sangria.schema.Schema
 import spray.json.{JsBoolean, JsObject, JsString}
 
 import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success, Try}
 
 object GraphQLServer {
@@ -63,17 +63,17 @@ object GraphQLServer {
 
   val schemaCache = new SafeUpdateCache(schemaConfig)
 
-  def importModel(requestJSON: spray.json.JsValue)(implicit e: 
ExecutionContext): Route = {
+  def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: 
ExecutionContext): Route = {
     val ret = Try {
       val spray.json.JsObject(fields) = requestJSON
       val spray.json.JsString(labelName) = fields("label")
       val jsOptions = fields("options")
 
-      s2graph.management.importModel(labelName, jsOptions.compactPrint)
+      s2graph.management.updateEdgeFetcher(labelName, jsOptions.compactPrint)
     }
 
     ret match {
-      case Success(f) => complete(f.map(i => OK -> JsString("start")))
+      case Success(f) => complete(OK -> JsString("start"))
       case Failure(e) => complete(InternalServerError -> 
spray.json.JsObject("message" -> JsString(e.toString)))
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
----------------------------------------------------------------------
diff --git 
a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala 
b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
index 38cdce3..6f57cc4 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
@@ -44,8 +44,8 @@ object Server extends App {
 
   val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) {
     entity(as[spray.json.JsValue])(GraphQLServer.endpoint)
-  } ~ (post & path("importModel")) {
-    entity(as[spray.json.JsValue])(GraphQLServer.importModel)
+  } ~ (post & path("updateEdgeFetcher")) {
+    entity(as[spray.json.JsValue])(GraphQLServer.updateEdgeFetcher)
   } ~ {
     getFromResource("assets/graphiql.html")
   }

Reply via email to