http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala index 628c5e1..796e52e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala @@ -21,10 +21,12 @@ package org.apache.s2graph.core.storage.rocks import com.typesafe.config.Config import org.apache.s2graph.core._ -import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe} -import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe} +import org.apache.s2graph.core.types.{HBaseType, VertexId} import org.rocksdb.RocksDB +import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} class RocksEdgeFetcher(val graph: S2GraphLike, @@ -57,4 +59,33 @@ class RocksEdgeFetcher(val graph: S2GraphLike, Future.sequence(futures) } + + override def fetchEdgesAll()(implicit ec: ExecutionContext) = { + val edges = new ArrayBuffer[S2EdgeLike]() + Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) => + val distinctLabels = labels.toSet + + val iter = db.newIterator() + try { + iter.seekToFirst() + while (iter.isValid) { + val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis()) + + serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None) + .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree) + .foreach { edge => + edges += edge + } + + + iter.next() + } + + } finally { + iter.close() + } + } + + Future.successful(edges) + } }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala index 8948e13..aaf9086 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala @@ -238,16 +238,13 @@ class RocksStorage(override val graph: S2GraphLike, private lazy val optimisticEdgeFetcher = new RocksOptimisticEdgeFetcher(graph, config, db, vdb, serDe, io) private lazy val optimisticMutator = new RocksOptimisticMutator(graph, serDe, optimisticEdgeFetcher, db, vdb, lockMap) - private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator) override val management: StorageManagement = new RocksStorageManagement(config, vdb, db) override val serDe: StorageSerDe = new RocksStorageSerDe(graph) override val edgeFetcher: EdgeFetcher = new RocksEdgeFetcher(graph, config, db, vdb, serDe, io) - override val edgeBulkFetcher: EdgeBulkFetcher = new RocksEdgeBulkFetcher(graph, config, db, vdb, serDe, io) override val vertexFetcher: VertexFetcher = new RocksVertexFetcher(graph, config, db, vdb, serDe, io) - override val vertexBulkFetcher: VertexBulkFetcher = new RocksVertexBulkFetcher(graph, config, db, vdb, serDe, io) - override val edgeMutator: EdgeMutator = _mutator - override val vertexMutator: VertexMutator = _mutator + override val edgeMutator: EdgeMutator = new DefaultOptimisticEdgeMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator, io) + override val vertexMutator: VertexMutator = new DefaultOptimisticVertexMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator, io) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala deleted file mode 100644 index 20acfaa..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexBulkFetcher.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.rocks - -import com.typesafe.config.Config -import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.schema.ServiceColumn -import org.apache.s2graph.core.{S2GraphLike, S2VertexLike, VertexBulkFetcher} -import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe} -import org.apache.s2graph.core.types.HBaseType -import org.rocksdb.RocksDB - -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future} - -class RocksVertexBulkFetcher(val graph: S2GraphLike, - val config: Config, - val db: RocksDB, - val vdb: RocksDB, - val serDe: StorageSerDe, - val io: StorageIO) extends VertexBulkFetcher { - import RocksStorage._ - - override def fetchVerticesAll()(implicit ec: ExecutionContext) = { - import scala.collection.mutable - - val vertices = new ArrayBuffer[S2VertexLike]() - ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) => - val distinctColumns = columns.toSet - - val iter = vdb.newIterator() - val buffer = mutable.ListBuffer.empty[SKeyValue] - var oldVertexIdBytes = Array.empty[Byte] - var minusPos = 0 - - try { - iter.seekToFirst() - while (iter.isValid) { - val row = iter.key() - if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - minusPos, row, 0, row.length - 1)) { - if (buffer.nonEmpty) - serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None) - .filter(v => distinctColumns(v.serviceColumn)) - .foreach { vertex => - vertices += vertex - } - - oldVertexIdBytes = row - minusPos = 1 - buffer.clear() - } - val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis()) - buffer += kv - - iter.next() - } - if (buffer.nonEmpty) - serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None) - .filter(v => distinctColumns(v.serviceColumn)) - .foreach { vertex => - vertices += vertex - } - - } finally { - iter.close() - } - } - - Future.successful(vertices) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala index 6becd98..2d3880c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksVertexFetcher.scala @@ -20,10 +20,15 @@ package org.apache.s2graph.core.storage.rocks import com.typesafe.config.Config +import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core._ +import org.apache.s2graph.core.schema.ServiceColumn +import org.apache.s2graph.core.storage.rocks.RocksStorage.{qualifier, table} import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe} +import org.apache.s2graph.core.types.HBaseType import org.rocksdb.RocksDB +import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} class RocksVertexFetcher(val graph: S2GraphLike, @@ -58,4 +63,52 @@ class RocksVertexFetcher(val graph: S2GraphLike, Future.sequence(futures).map(_.flatten) } + + override def fetchVerticesAll()(implicit ec: ExecutionContext) = { + import scala.collection.mutable + + val vertices = new ArrayBuffer[S2VertexLike]() + ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) => + val distinctColumns = columns.toSet + + val iter = vdb.newIterator() + val buffer = mutable.ListBuffer.empty[SKeyValue] + var oldVertexIdBytes = Array.empty[Byte] + var minusPos = 0 + + try { + iter.seekToFirst() + while (iter.isValid) { + val row = iter.key() + if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - minusPos, row, 0, row.length - 1)) { + if (buffer.nonEmpty) + serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None) + .filter(v => distinctColumns(v.serviceColumn)) + .foreach { vertex => + vertices += vertex + } + + oldVertexIdBytes = row + minusPos = 1 + buffer.clear() + } + val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis()) + buffer += kv + + iter.next() + } + if (buffer.nonEmpty) + serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None) + .filter(v => distinctColumns(v.serviceColumn)) + .foreach { vertex => + vertices += vertex + } + + } finally { + iter.close() + } + } + + Future.successful(vertices) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala index 7d15078..51e45c2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala @@ -114,7 +114,9 @@ class SafeUpdateCache(val config: Config) cache.invalidate(cacheKey) } - def withCache[T <: AnyRef](key: String, broadcast: Boolean)(op: => T): T = { + def withCache[T <: AnyRef](key: String, + broadcast: Boolean, + forceUpdate: Boolean = false)(op: => T): T = { val cacheKey = toCacheKey(key) val cachedValWithTs = cache.getIfPresent(cacheKey) @@ -127,7 +129,7 @@ class SafeUpdateCache(val config: Config) val (_cachedVal, updatedAt, isUpdating) = cachedValWithTs val cachedVal = _cachedVal.asInstanceOf[T] - if (toTs() < updatedAt + ttl) cachedVal // in cache TTL + if (!forceUpdate && toTs() < updatedAt + ttl) cachedVal // in cache TTL else { val running = isUpdating.getAndSet(true) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala index 6d95c93..930c517 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/fetcher/EdgeFetcherTest.scala @@ -23,7 +23,7 @@ import com.typesafe.config.ConfigFactory import org.apache.s2graph.core.Integrate.IntegrateCommon import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core.schema.Label -import org.apache.s2graph.core.{Query, QueryParam} +import org.apache.s2graph.core.{Query, QueryParam, ResourceManager} import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext} @@ -44,10 +44,10 @@ class EdgeFetcherTest extends IntegrateCommon { s"""{ | | "importer": { - | "${FetcherManager.ClassNameKey}": "org.apache.s2graph.core.utils.IdentityImporter" + | "${ResourceManager.ClassNameKey}": "org.apache.s2graph.core.utils.IdentityImporter" | }, | "fetcher": { - | "${FetcherManager.ClassNameKey}": "org.apache.s2graph.core.fetcher.MemoryModelFetcher" + | "${ResourceManager.ClassNameKey}": "org.apache.s2graph.core.fetcher.MemoryModelEdgeFetcher" | } |}""".stripMargin @@ -68,11 +68,9 @@ class EdgeFetcherTest extends IntegrateCommon { "gz", options ) - val config = ConfigFactory.parseString(options) - val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) - Await.ready(importerFuture, Duration("60 seconds")) - Thread.sleep(1000) + graph.management.updateEdgeFetcher(label, options) + val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "daewon") val queryParam = QueryParam(labelName = labelName) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala index 0bf62d9..eee7c93 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala @@ -40,7 +40,7 @@ import sangria.schema.Schema import spray.json.{JsBoolean, JsObject, JsString} import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} object GraphQLServer { @@ -63,17 +63,17 @@ object GraphQLServer { val schemaCache = new SafeUpdateCache(schemaConfig) - def importModel(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { + def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { val ret = Try { val spray.json.JsObject(fields) = requestJSON val spray.json.JsString(labelName) = fields("label") val jsOptions = fields("options") - s2graph.management.importModel(labelName, jsOptions.compactPrint) + s2graph.management.updateEdgeFetcher(labelName, jsOptions.compactPrint) } ret match { - case Success(f) => complete(f.map(i => OK -> JsString("start"))) + case Success(f) => complete(OK -> JsString("start")) case Failure(e) => complete(InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString))) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala index 38cdce3..6f57cc4 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala @@ -44,8 +44,8 @@ object Server extends App { val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) { entity(as[spray.json.JsValue])(GraphQLServer.endpoint) - } ~ (post & path("importModel")) { - entity(as[spray.json.JsValue])(GraphQLServer.importModel) + } ~ (post & path("updateEdgeFetcher")) { + entity(as[spray.json.JsValue])(GraphQLServer.updateEdgeFetcher) } ~ { getFromResource("assets/graphiql.html") }
