Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 16f18ffe0 -> 3cc98da8a


add RocksStorage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4ea9a8e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4ea9a8e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4ea9a8e7

Branch: refs/heads/master
Commit: 4ea9a8e75d94e240c8635fa1f9ab641a61ffca3b
Parents: 16f18ff
Author: DO YUNG YOON <[email protected]>
Authored: Sun Nov 19 13:00:07 2017 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Sun Nov 19 13:00:07 2017 +0900

----------------------------------------------------------------------
 s2core/build.sbt                                |   1 +
 .../scala/org/apache/s2graph/core/S2Graph.scala |  10 +-
 .../s2graph/core/storage/StorageReadable.scala  |   2 +-
 .../hbase/AsynchbaseStorageReadable.scala       |   2 +-
 .../core/storage/rocks/RocksHelper.scala        |  45 +++++
 .../core/storage/rocks/RocksStorage.scala       | 141 ++++++++++++++
 .../storage/rocks/RocksStorageManagement.scala  |  26 +++
 .../storage/rocks/RocksStorageReadable.scala    | 189 +++++++++++++++++++
 .../core/storage/rocks/RocksStorageSerDe.scala  |  33 ++++
 .../storage/rocks/RocksStorageWritable.scala    | 110 +++++++++++
 .../core/storage/rocks/RocksStorageTest.scala   |  33 ++++
 11 files changed, 587 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index b4273ba..99cf61d 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -49,6 +49,7 @@ libraryDependencies ++= Seq(
   "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion ,
   "org.apache.lucene" % "lucene-core" % "6.6.0",
   "org.apache.lucene" % "lucene-queryparser" % "6.6.0",
+  "org.rocksdb" % "rocksdbjni" % "5.8.0"
   "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0"
 )
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 3905442..2fc36b6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -29,6 +29,7 @@ import org.apache.s2graph.core.index.IndexProvider
 import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
 import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
+import org.apache.s2graph.core.storage.rocks.RocksStorage
 import org.apache.s2graph.core.storage.{MutateResponse, Storage}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
@@ -37,10 +38,10 @@ import org.apache.tinkerpop.gremlin.structure.{Direction, 
Edge, Graph}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.concurrent._
 import scala.concurrent.duration.Duration
-import scala.util.Try
+import scala.util.{Random, Try}
 
 
 object S2Graph {
@@ -81,7 +82,9 @@ object S2Graph {
     "query.future.cache.expire.after.write" -> 
java.lang.Integer.valueOf(10000),
     "query.future.cache.expire.after.access" -> 
java.lang.Integer.valueOf(5000),
     "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
-    "s2graph.storage.backend" -> "hbase",
+//    "s2graph.storage.backend" -> "hbase",
+    "s2graph.storage.backend" -> "rocks",
+    RocksStorage.FilePathKey -> "rocks_db",
     "query.hardlimit" -> java.lang.Integer.valueOf(100000),
     "hbase.zookeeper.znode.parent" -> "/hbase",
     "query.log.sample.rate" -> Double.box(0.05)
@@ -140,6 +143,7 @@ object S2Graph {
             null
 
         new AsynchbaseStorage(graph, config)
+      case "rocks" => new RocksStorage(graph, config)
       case _ => throw new RuntimeException("not supported storage.")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index 79ca8aa..0965f68 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -27,8 +27,8 @@ import org.apache.s2graph.core.utils.logger
 import scala.concurrent.{ExecutionContext, Future}
 
 trait StorageReadable {
-  val serDe: StorageSerDe
   val io: StorageIO
+  val serDe: StorageSerDe
  /**
     * responsible to fire parallel fetch call into storage and create future 
that will return merged result.
     *

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index af82439..b1fdd7c 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -41,7 +41,7 @@ import scala.concurrent.{ExecutionContext, Future}
 class AsynchbaseStorageReadable(val graph: S2GraphLike,
                                 val config: Config,
                                 val client: HBaseClient,
-                                val serDe: StorageSerDe,
+                                override val serDe: StorageSerDe,
                                 override val io: StorageIO) extends 
StorageReadable {
   import Extensions.DeferOps
   import CanDefer._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala
new file mode 100644
index 0000000..1054683
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala
@@ -0,0 +1,45 @@
+package org.apache.s2graph.core.storage.rocks
+
+import java.nio.{ByteBuffer, ByteOrder}
+
+import org.apache.s2graph.core.QueryParam
+
+object RocksHelper {
+
+  def intToBytes(value: Int): Array[Byte] = {
+    val intBuffer = ByteBuffer.allocate(4).order(ByteOrder.nativeOrder())
+    intBuffer.clear()
+    intBuffer.putInt(value)
+    intBuffer.array()
+  }
+
+  def longToBytes(value: Long): Array[Byte] = {
+    val longBuffer = ByteBuffer.allocate(8).order(ByteOrder.nativeOrder())
+    longBuffer.clear()
+    longBuffer.putLong(value)
+    longBuffer.array()
+  }
+
+  def bytesToInt(data: Array[Byte], offset: Int): Int = {
+    if (data != null) {
+      val intBuffer = ByteBuffer.allocate(4).order(ByteOrder.nativeOrder())
+      intBuffer.put(data, offset, 4)
+      intBuffer.flip()
+      intBuffer.getInt()
+    } else 0
+  }
+
+  def bytesToLong(data: Array[Byte], offset: Int): Long = {
+    if (data != null) {
+      val longBuffer = ByteBuffer.allocate(8).order(ByteOrder.nativeOrder())
+      longBuffer.put(data, offset, 8)
+      longBuffer.flip()
+      longBuffer.getLong()
+    } else 0L
+  }
+
+  case class ScanWithRange(queryParam: QueryParam, startKey: Array[Byte], 
stopKey: Array[Byte], offset: Int, limit: Int)
+  case class GetRequest(cf: Array[Byte], key: Array[Byte])
+
+  type RocksRPC = Either[GetRequest, ScanWithRange]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
new file mode 100644
index 0000000..c9a0b26
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
@@ -0,0 +1,141 @@
+package org.apache.s2graph.core.storage.rocks
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.google.common.hash.Hashing
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage.Storage
+import org.apache.s2graph.core.storage.rocks.RocksHelper.RocksRPC
+import org.apache.s2graph.core.utils.logger
+import org.rocksdb._
+import org.rocksdb.util.SizeUnit
+
+import scala.util.{Random, Try}
+
+object RocksStorage {
+
+  RocksDB.loadLibrary()
+
+  val LockExpireDuration = 60000
+  val FilePathKey = "rocks.storage.file.path"
+  val StorageModeKey = "rocks.storage.mode"
+  val TtlKey = "rocks.storage.ttl"
+  val ReadOnlyKey = "rocks.storage.read.only"
+  val VertexPostfix = "vertex"
+  val EdgePostfix = "edge"
+
+  val dbPool = CacheBuilder.newBuilder()
+    .concurrencyLevel(8)
+    .expireAfterAccess(LockExpireDuration, TimeUnit.MILLISECONDS)
+    .expireAfterWrite(LockExpireDuration, TimeUnit.MILLISECONDS)
+    .maximumSize(1000 * 10 * 10 * 10 * 10)
+    .build[String, (RocksDB, RocksDB)]
+
+//  val writeOptions = new WriteOptions()
+
+  val tableOptions = new BlockBasedTableConfig
+  tableOptions
+    .setBlockCacheSize(1 * SizeUnit.GB)
+    .setCacheIndexAndFilterBlocks(true)
+    .setHashIndexAllowCollision(false)
+
+  val options = new Options
+  options
+    .setCreateIfMissing(true)
+    .setMergeOperatorName("uint64add")
+    .setTableCacheNumshardbits(5)
+    .setIncreaseParallelism(8)
+    .setArenaBlockSize(1024 * 32)
+    .setTableFormatConfig(tableOptions)
+    .setWriteBufferSize(1024 * 1024 * 512)
+
+  def configKey(config: Config): Long =
+    Hashing.murmur3_128().hashBytes(config.toString.getBytes("UTF-8")).asLong()
+
+  def getFilePath(config: Config): String = config.getString(FilePathKey)
+
+  def getOrElseUpdate(config: Config): (RocksDB, RocksDB) = {
+    val path = config.getString(FilePathKey)
+    val storageMode = Try { config.getString(StorageModeKey) 
}.getOrElse("test")
+    val ttl = Try { config.getInt(TtlKey) }.getOrElse(-1)
+    val readOnly = Try { config.getBoolean(ReadOnlyKey) } getOrElse(false)
+
+    val newPair = (
+      openDatabase(path, options, VertexPostfix, storageMode, ttl, readOnly),
+      openDatabase(path, options, EdgePostfix, storageMode, ttl, readOnly)
+    )
+    dbPool.asMap().putIfAbsent(path, newPair) match {
+      case null => newPair
+      case old =>
+        newPair._1.close()
+        newPair._2.close()
+        old
+    }
+  }
+
+  def shutdownAll(): Unit = {
+    import scala.collection.JavaConversions._
+    dbPool.asMap.foreach { case (k, (vdb, edb)) =>
+        vdb.close()
+        edb.close()
+    }
+  }
+
+  private def openDatabase(path: String,
+                           options: Options,
+                           postfix: String = "",
+                           storageMode: String = "production",
+                           ttl: Int = -1,
+                           readOnly: Boolean = false): RocksDB = {
+    try {
+      val filePath = s"${path}_${postfix}"
+      logger.info(s"Open RocksDB: ${filePath}")
+
+      storageMode match {
+        case "test" => RocksDB.open(options, 
s"${filePath}_${Random.nextInt()}")
+        case _ =>
+          if (ttl < 0) {
+            if (readOnly) RocksDB.openReadOnly(options, filePath)
+            else RocksDB.open(options, filePath)
+          } else {
+            TtlDB.open(options, filePath, ttl, readOnly)
+          }
+      }
+    } catch {
+      case e: RocksDBException =>
+        logger.error(s"initialize rocks db storage failed.", e)
+        throw e
+    }
+  }
+}
+
+class RocksStorage(override val graph: S2GraphLike,
+                   override val config: Config) extends Storage(graph, config) 
{
+
+  import RocksStorage._
+  var closed = false
+
+  lazy val (vdb, db) = getOrElseUpdate(config)
+
+  val cacheLoader = new CacheLoader[String, ReentrantLock] {
+    override def load(key: String) = new ReentrantLock()
+  }
+
+  val lockMap: LoadingCache[String, ReentrantLock] = CacheBuilder.newBuilder()
+    .concurrencyLevel(8)
+    .expireAfterAccess(graph.LockExpireDuration, TimeUnit.MILLISECONDS)
+    .expireAfterWrite(graph.LockExpireDuration, TimeUnit.MILLISECONDS)
+    .maximumSize(1000 * 10 * 10 * 10 * 10)
+    .build[String, ReentrantLock](cacheLoader)
+
+  override val management = new RocksStorageManagement(config, vdb, db)
+
+  override val mutator = new RocksStorageWritable(db, vdb, lockMap)
+
+  override val serDe = new RocksStorageSerDe(graph)
+
+  override val fetcher = new RocksStorageReadable(graph, config, db, vdb, 
serDe, io)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageManagement.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageManagement.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageManagement.scala
new file mode 100644
index 0000000..3b9d4f8
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageManagement.scala
@@ -0,0 +1,26 @@
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.storage.StorageManagement
+import org.rocksdb.RocksDB
+
+class RocksStorageManagement(val config: Config,
+                             val vdb: RocksDB,
+                             val db: RocksDB) extends StorageManagement {
+  val path =RocksStorage.getFilePath(config)
+
+
+  override def flush(): Unit = {
+    vdb.close()
+    db.close()
+    RocksStorage.dbPool.asMap().remove(path)
+  }
+
+  override def createTable(config: Config, tableNameStr: String): Unit = {}
+
+  override def truncateTable(config: Config, tableNameStr: String): Unit = {}
+
+  override def deleteTable(config: Config, tableNameStr: String): Unit = {}
+
+  override def shutdown(): Unit = flush()
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
new file mode 100644
index 0000000..194b3d2
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
@@ -0,0 +1,189 @@
+package org.apache.s2graph.core.storage.rocks
+
+import java.util.Base64
+
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, 
RocksRPC, ScanWithRange}
+import org.apache.s2graph.core.storage.serde.StorageSerializable
+import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, 
StorageSerDe}
+import org.apache.s2graph.core.types.{HBaseType, VertexId}
+import org.rocksdb.RocksDB
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksStorageReadable(val graph: S2GraphLike,
+                           val config: Config,
+                           val db: RocksDB,
+                           val vdb: RocksDB,
+                           val serDe: StorageSerDe,
+                           override val io: StorageIO) extends StorageReadable 
{
+
+  private val table = Array.emptyByteArray
+  private val qualifier = Array.emptyByteArray
+
+  private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike): 
RocksRPC = {
+    queryRequest.queryParam.tgtVertexInnerIdOpt match {
+      case None => // indexEdges
+        val queryParam = queryRequest.queryParam
+        val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => 
edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
+        val indexEdge = indexEdgeOpt.getOrElse(throw new 
RuntimeException(s"Can`t find index for query $queryParam"))
+        val srcIdBytes = 
VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+        val labelWithDirBytes = indexEdge.labelWithDir.bytes
+        val labelIndexSeqWithIsInvertedBytes = 
StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, 
isInverted = false)
+
+        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
+
+        val (intervalMaxBytes, intervalMinBytes) = 
queryParam.buildInterval(Option(edge))
+        val (startKey, stopKey) =
+          if (queryParam.intervalOpt.isDefined) {
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Base64.getDecoder.decode(cursor)
+              case None => Bytes.add(baseKey, intervalMaxBytes)
+            }
+            (_startKey, Bytes.add(baseKey, intervalMinBytes))
+          } else {
+            val _startKey = queryParam.cursorOpt match {
+              case Some(cursor) => Base64.getDecoder.decode(cursor)
+              case None => baseKey
+            }
+            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
+          }
+
+        Right(ScanWithRange(queryParam, startKey, stopKey, queryParam.offset, 
queryParam.limit))
+
+      case Some(tgtId) => // snapshotEdge
+        val kv = 
serDe.snapshotEdgeSerializer(graph.elementBuilder.toRequestEdge(queryRequest, 
Nil).toSnapshotEdge).toKeyValues.head
+        Left(GetRequest(SKeyValue.EdgeCf, kv.row))
+    }
+  }
+
+  private def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike): 
RocksRPC = {
+    val kv = serDe.vertexSerializer(vertex).toKeyValues.head
+    Left(GetRequest(SKeyValue.VertexCf, kv.row))
+  }
+
+  override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: 
Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): 
Future[Seq[StepResult]] = {
+    val futures = for {
+      queryRequest <- queryRequests
+    } yield {
+      val parentEdges = prevStepEdges.getOrElse(queryRequest.vertex.id, Nil)
+      val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
+      val rpc = buildRequest(queryRequest, edge)
+      fetchKeyValues(rpc).map { kvs =>
+        val queryParam = queryRequest.queryParam
+        val stepResult = io.toEdges(kvs, queryRequest, 
queryRequest.prevStepScore, false, parentEdges)
+        val edgeWithScores = stepResult.edgeWithScores.filter { case 
edgeWithScore =>
+          val edge = edgeWithScore.edge
+          val duration = queryParam.durationOpt.getOrElse((Long.MinValue, 
Long.MaxValue))
+          edge.ts >= duration._1 && edge.ts < duration._2
+        }
+
+        stepResult.copy(edgeWithScores = edgeWithScores)
+      }
+    }
+
+    Future.sequence(futures)
+  }
+
+  private def fetchKeyValues(rpc: RocksRPC)(implicit ec: ExecutionContext): 
Future[Seq[SKeyValue]] = {
+    rpc match {
+      case Left(GetRequest(cf, key)) =>
+        val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
+        val v = _db.get(key)
+
+        val kvs =
+          if (v == null) Seq.empty
+          else Seq(SKeyValue(table, key, cf, qualifier, v, 
System.currentTimeMillis()))
+
+        Future.successful(kvs)
+      case Right(ScanWithRange(queryParam, startKey, stopKey, offset, limit)) 
=>
+        val kvs = new ArrayBuffer[SKeyValue]()
+        val iter = db.newIterator()
+        try {
+          var idx = 0
+          iter.seek(startKey)
+          val (startOffset, len) = (queryParam.innerOffset, 
queryParam.innerLimit)
+          while (iter.isValid && Bytes.compareTo(iter.key, stopKey) <= 0 && 
idx < startOffset + len) {
+            if (idx >= startOffset) {
+              kvs += SKeyValue(table, iter.key, SKeyValue.EdgeCf, qualifier, 
iter.value, System.currentTimeMillis())
+            }
+
+            iter.next()
+            idx += 1
+          }
+        } finally {
+          iter.close()
+        }
+
+        Future.successful(kvs)
+    }
+  }
+
+  override def fetchEdgesAll()(implicit ec: ExecutionContext) = {
+    val edges = new ArrayBuffer[S2EdgeLike]()
+    Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case 
(hTableName, labels) =>
+      val distinctLabels = labels.toSet
+
+      val iter = db.newIterator()
+      try {
+        iter.seekToFirst()
+        while (iter.isValid) {
+          val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, 
iter.value, System.currentTimeMillis())
+
+          serDe.indexEdgeDeserializer(schemaVer = 
HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
+            .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == 
"out" && !e.isDegree)
+            .foreach { edge =>
+              edges += edge
+            }
+
+
+          iter.next()
+        }
+
+      } finally {
+        iter.close()
+      }
+    }
+
+    Future.successful(edges)
+  }
+
+  override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
+    val vertices = new ArrayBuffer[S2VertexLike]()
+    ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case 
(hTableName, columns) =>
+      val distinctColumns = columns.toSet
+
+      val iter = vdb.newIterator()
+      try {
+        iter.seekToFirst()
+        while (iter.isValid) {
+          val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, 
iter.value(), System.currentTimeMillis())
+
+          serDe.vertexDeserializer(schemaVer = 
HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
+            .filter(v => distinctColumns(v.serviceColumn))
+            .foreach { vertex =>
+              vertices += vertex
+            }
+
+          iter.next()
+        }
+      } finally {
+        iter.close()
+      }
+    }
+
+    Future.successful(vertices)
+  }
+
+  override def fetchKeyValues(queryRequest: QueryRequest, edge: 
S2EdgeLike)(implicit ec: ExecutionContext) = {
+    fetchKeyValues(buildRequest(queryRequest, edge))
+  }
+
+  override def fetchKeyValues(queryRequest: QueryRequest, vertex: 
S2VertexLike)(implicit ec: ExecutionContext) = {
+    fetchKeyValues(buildRequest(queryRequest, vertex))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageSerDe.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageSerDe.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageSerDe.scala
new file mode 100644
index 0000000..833c857
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageSerDe.scala
@@ -0,0 +1,33 @@
+package org.apache.s2graph.core.storage.rocks
+
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe, serde}
+import org.apache.s2graph.core.types.HBaseType
+
+class RocksStorageSerDe(val graph: S2GraphLike) extends StorageSerDe {
+
+  override def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) =
+    new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
+
+  override def indexEdgeSerializer(indexEdge: IndexEdge) =
+    new serde.indexedge.tall.IndexEdgeSerializable(indexEdge, 
RocksHelper.longToBytes)
+
+  override def vertexSerializer(vertex: S2VertexLike) =
+      new serde.vertex.tall.VertexSerializable(vertex, RocksHelper.intToBytes)
+
+
+  private val snapshotEdgeDeserializer = new 
serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph)
+  override def snapshotEdgeDeserializer(schemaVer: String) = 
snapshotEdgeDeserializer
+
+  private val indexEdgeDeserializable =
+    new serde.indexedge.tall.IndexEdgeDeserializable(graph,
+      RocksHelper.bytesToLong, tallSchemaVersions = 
HBaseType.ValidVersions.toSet)
+
+  override def indexEdgeDeserializer(schemaVer: String) = 
indexEdgeDeserializable
+
+  private val vertexDeserializer =
+      new serde.vertex.tall.VertexDeserializable(graph)
+
+  override def vertexDeserializer(schemaVer: String) = vertexDeserializer
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
new file mode 100644
index 0000000..1057781
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala
@@ -0,0 +1,110 @@
+package org.apache.s2graph.core.storage.rocks
+
+import java.util.concurrent.locks.ReentrantLock
+
+import com.google.common.cache.{Cache, LoadingCache}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue, 
StorageWritable}
+import org.apache.s2graph.core.utils.logger
+import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksStorageWritable(val db: RocksDB,
+                           val vdb: RocksDB,
+                           val lockMap: LoadingCache[String, ReentrantLock]) 
extends StorageWritable {
+
+  override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean)(implicit ec: ExecutionContext) = {
+    if (kvs.isEmpty) {
+      Future.successful(MutateResponse.Success)
+    } else {
+      val ret = {
+        val (kvsV, kvsE) = kvs.partition(kv => Bytes.equals(kv.cf, 
SKeyValue.VertexCf))
+        val writeBatchV = buildWriteBatch(kvsV)
+        val writeBatchE = buildWriteBatch(kvsE)
+        val writeOptions = new WriteOptions
+        try {
+          vdb.write(writeOptions, writeBatchV)
+          db.write(writeOptions, writeBatchE)
+          true
+        } catch {
+          case e: Exception =>
+            logger.error(s"writeAsyncSimple failed.", e)
+            false
+        } finally {
+          writeBatchV.close()
+          writeBatchE.close()
+          writeOptions.close()
+        }
+      }
+
+      Future.successful(new MutateResponse(ret))
+    }
+  }
+
+
+  override def writeLock(requestKeyValue: SKeyValue, expectedOpt: 
Option[SKeyValue])(implicit ec: ExecutionContext) = {
+    def op = {
+      val writeOptions = new WriteOptions
+      try {
+        val fetchedValue = db.get(requestKeyValue.row)
+        val innerRet = expectedOpt match {
+          case None =>
+            if (fetchedValue == null) {
+
+              db.put(writeOptions, requestKeyValue.row, requestKeyValue.value)
+              true
+            } else {
+              false
+            }
+          case Some(kv) =>
+            if (fetchedValue == null) {
+              false
+            } else {
+              if (Bytes.compareTo(fetchedValue, kv.value) == 0) {
+                db.put(writeOptions, requestKeyValue.row, 
requestKeyValue.value)
+                true
+              } else {
+                false
+              }
+            }
+        }
+
+        Future.successful(new MutateResponse(innerRet))
+      } catch {
+        case e: RocksDBException =>
+          logger.error(s"Write lock failed", e)
+          Future.successful(MutateResponse.Failure)
+      } finally {
+        writeOptions.close()
+      }
+    }
+
+    withLock(requestKeyValue.row)(op)
+  }
+
+  private def buildWriteBatch(kvs: Seq[SKeyValue]): WriteBatch = {
+    val writeBatch = new WriteBatch()
+    kvs.foreach { kv =>
+      kv.operation match {
+        case SKeyValue.Put => writeBatch.put(kv.row, kv.value)
+        case SKeyValue.Delete => writeBatch.remove(kv.row)
+        case SKeyValue.Increment => writeBatch.merge(kv.row, kv.value)
+        case _ => throw new RuntimeException(s"not supported rpc operation. 
${kv.operation}")
+      }
+    }
+    writeBatch
+  }
+
+  private def withLock[A](key: Array[Byte])(op: => A): A = {
+    val lockKey = Bytes.toString(key)
+    val lock = lockMap.get(lockKey)
+
+    try {
+      lock.lock
+      op
+    } finally {
+      lock.unlock()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala
new file mode 100644
index 0000000..0f36b9d
--- /dev/null
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala
@@ -0,0 +1,33 @@
+package org.apache.s2graph.core.storage.rocks
+
+import org.apache.s2graph.core.TestCommonWithModels
+import org.apache.s2graph.core.mysqls.{Service, ServiceColumn}
+import org.apache.tinkerpop.gremlin.structure.T
+import org.scalatest.{FunSuite, Matchers}
+
+import scala.collection.JavaConversions._
+
+class RocksStorageTest  extends FunSuite with Matchers with 
TestCommonWithModels {
+  initTests()
+
+//  test("VertexTest: shouldNotGetConcurrentModificationException()") {
+//    val service = Service.findByName(serviceName, useCache = 
false).getOrElse {
+//      throw new IllegalStateException("service not found.")
+//    }
+//    val column = ServiceColumn.find(service.id.get, columnName).getOrElse {
+//      throw new IllegalStateException("column not found.")
+//    }
+//
+//    val vertexId = graph.elementBuilder.newVertexId(service, column, 1L)
+//
+//    val vertex = graph.elementBuilder.newVertex(vertexId)
+//    for (i <- (0 until 10)) {
+//      vertex.addEdge(labelName, vertex)
+//    }
+//
+//    println(graph.edges().toSeq)
+//    println("*" * 100)
+//    vertex.remove()
+//    println(graph.vertices().toSeq)
+//  }
+}

Reply via email to