Repository: incubator-s2graph Updated Branches: refs/heads/master 16f18ffe0 -> 3cc98da8a
add RocksStorage. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4ea9a8e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4ea9a8e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4ea9a8e7 Branch: refs/heads/master Commit: 4ea9a8e75d94e240c8635fa1f9ab641a61ffca3b Parents: 16f18ff Author: DO YUNG YOON <[email protected]> Authored: Sun Nov 19 13:00:07 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sun Nov 19 13:00:07 2017 +0900 ---------------------------------------------------------------------- s2core/build.sbt | 1 + .../scala/org/apache/s2graph/core/S2Graph.scala | 10 +- .../s2graph/core/storage/StorageReadable.scala | 2 +- .../hbase/AsynchbaseStorageReadable.scala | 2 +- .../core/storage/rocks/RocksHelper.scala | 45 +++++ .../core/storage/rocks/RocksStorage.scala | 141 ++++++++++++++ .../storage/rocks/RocksStorageManagement.scala | 26 +++ .../storage/rocks/RocksStorageReadable.scala | 189 +++++++++++++++++++ .../core/storage/rocks/RocksStorageSerDe.scala | 33 ++++ .../storage/rocks/RocksStorageWritable.scala | 110 +++++++++++ .../core/storage/rocks/RocksStorageTest.scala | 33 ++++ 11 files changed, 587 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index b4273ba..99cf61d 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -49,6 +49,7 @@ libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion , "org.apache.lucene" % "lucene-core" % "6.6.0", "org.apache.lucene" % "lucene-queryparser" % "6.6.0", + "org.rocksdb" % "rocksdbjni" % "5.8.0" "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0" ) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 3905442..2fc36b6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -29,6 +29,7 @@ import org.apache.s2graph.core.index.IndexProvider import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage +import org.apache.s2graph.core.storage.rocks.RocksStorage import org.apache.s2graph.core.storage.{MutateResponse, Storage} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} @@ -37,10 +38,10 @@ import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph} import scala.collection.JavaConversions._ import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.concurrent._ import scala.concurrent.duration.Duration -import scala.util.Try +import scala.util.{Random, Try} object S2Graph { @@ -81,7 +82,9 @@ object S2Graph { "query.future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), "query.future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), "query.future.cache.metric.interval" -> java.lang.Integer.valueOf(60000), - "s2graph.storage.backend" -> "hbase", +// "s2graph.storage.backend" -> "hbase", + "s2graph.storage.backend" -> "rocks", + RocksStorage.FilePathKey -> "rocks_db", "query.hardlimit" -> java.lang.Integer.valueOf(100000), "hbase.zookeeper.znode.parent" -> "/hbase", "query.log.sample.rate" -> Double.box(0.05) @@ -140,6 +143,7 @@ object S2Graph { null new AsynchbaseStorage(graph, config) + case "rocks" => new RocksStorage(graph, config) case _ => throw new RuntimeException("not supported storage.") } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala index 79ca8aa..0965f68 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala @@ -27,8 +27,8 @@ import org.apache.s2graph.core.utils.logger import scala.concurrent.{ExecutionContext, Future} trait StorageReadable { - val serDe: StorageSerDe val io: StorageIO + val serDe: StorageSerDe /** * responsible to fire parallel fetch call into storage and create future that will return merged result. * http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala index af82439..b1fdd7c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala @@ -41,7 +41,7 @@ import scala.concurrent.{ExecutionContext, Future} class AsynchbaseStorageReadable(val graph: S2GraphLike, val config: Config, val client: HBaseClient, - val serDe: StorageSerDe, + override val serDe: StorageSerDe, override val io: StorageIO) extends StorageReadable { import Extensions.DeferOps import CanDefer._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala new file mode 100644 index 0000000..1054683 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksHelper.scala @@ -0,0 +1,45 @@ +package org.apache.s2graph.core.storage.rocks + +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.s2graph.core.QueryParam + +object RocksHelper { + + def intToBytes(value: Int): Array[Byte] = { + val intBuffer = ByteBuffer.allocate(4).order(ByteOrder.nativeOrder()) + intBuffer.clear() + intBuffer.putInt(value) + intBuffer.array() + } + + def longToBytes(value: Long): Array[Byte] = { + val longBuffer = ByteBuffer.allocate(8).order(ByteOrder.nativeOrder()) + longBuffer.clear() + longBuffer.putLong(value) + longBuffer.array() + } + + def bytesToInt(data: Array[Byte], offset: Int): Int = { + if (data != null) { + val intBuffer = ByteBuffer.allocate(4).order(ByteOrder.nativeOrder()) + intBuffer.put(data, offset, 4) + intBuffer.flip() + intBuffer.getInt() + } else 0 + } + + def bytesToLong(data: Array[Byte], offset: Int): Long = { + if (data != null) { + val longBuffer = ByteBuffer.allocate(8).order(ByteOrder.nativeOrder()) + longBuffer.put(data, offset, 8) + longBuffer.flip() + longBuffer.getLong() + } else 0L + } + + case class ScanWithRange(queryParam: QueryParam, startKey: Array[Byte], stopKey: Array[Byte], offset: Int, limit: Int) + case class GetRequest(cf: Array[Byte], key: Array[Byte]) + + type RocksRPC = Either[GetRequest, ScanWithRange] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala new file mode 100644 index 0000000..c9a0b26 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala @@ -0,0 +1,141 @@ +package org.apache.s2graph.core.storage.rocks + +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock + +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.google.common.hash.Hashing +import com.typesafe.config.Config +import org.apache.s2graph.core._ +import org.apache.s2graph.core.storage.Storage +import org.apache.s2graph.core.storage.rocks.RocksHelper.RocksRPC +import org.apache.s2graph.core.utils.logger +import org.rocksdb._ +import org.rocksdb.util.SizeUnit + +import scala.util.{Random, Try} + +object RocksStorage { + + RocksDB.loadLibrary() + + val LockExpireDuration = 60000 + val FilePathKey = "rocks.storage.file.path" + val StorageModeKey = "rocks.storage.mode" + val TtlKey = "rocks.storage.ttl" + val ReadOnlyKey = "rocks.storage.read.only" + val VertexPostfix = "vertex" + val EdgePostfix = "edge" + + val dbPool = CacheBuilder.newBuilder() + .concurrencyLevel(8) + .expireAfterAccess(LockExpireDuration, TimeUnit.MILLISECONDS) + .expireAfterWrite(LockExpireDuration, TimeUnit.MILLISECONDS) + .maximumSize(1000 * 10 * 10 * 10 * 10) + .build[String, (RocksDB, RocksDB)] + +// val writeOptions = new WriteOptions() + + val tableOptions = new BlockBasedTableConfig + tableOptions + .setBlockCacheSize(1 * SizeUnit.GB) + .setCacheIndexAndFilterBlocks(true) + .setHashIndexAllowCollision(false) + + val options = new Options + options + .setCreateIfMissing(true) + .setMergeOperatorName("uint64add") + .setTableCacheNumshardbits(5) + .setIncreaseParallelism(8) + .setArenaBlockSize(1024 * 32) + .setTableFormatConfig(tableOptions) + .setWriteBufferSize(1024 * 1024 * 512) + + def configKey(config: Config): Long = + Hashing.murmur3_128().hashBytes(config.toString.getBytes("UTF-8")).asLong() + + def getFilePath(config: Config): String = config.getString(FilePathKey) + + def getOrElseUpdate(config: Config): (RocksDB, RocksDB) = { + val path = config.getString(FilePathKey) + val storageMode = Try { config.getString(StorageModeKey) }.getOrElse("test") + val ttl = Try { config.getInt(TtlKey) }.getOrElse(-1) + val readOnly = Try { config.getBoolean(ReadOnlyKey) } getOrElse(false) + + val newPair = ( + openDatabase(path, options, VertexPostfix, storageMode, ttl, readOnly), + openDatabase(path, options, EdgePostfix, storageMode, ttl, readOnly) + ) + dbPool.asMap().putIfAbsent(path, newPair) match { + case null => newPair + case old => + newPair._1.close() + newPair._2.close() + old + } + } + + def shutdownAll(): Unit = { + import scala.collection.JavaConversions._ + dbPool.asMap.foreach { case (k, (vdb, edb)) => + vdb.close() + edb.close() + } + } + + private def openDatabase(path: String, + options: Options, + postfix: String = "", + storageMode: String = "production", + ttl: Int = -1, + readOnly: Boolean = false): RocksDB = { + try { + val filePath = s"${path}_${postfix}" + logger.info(s"Open RocksDB: ${filePath}") + + storageMode match { + case "test" => RocksDB.open(options, s"${filePath}_${Random.nextInt()}") + case _ => + if (ttl < 0) { + if (readOnly) RocksDB.openReadOnly(options, filePath) + else RocksDB.open(options, filePath) + } else { + TtlDB.open(options, filePath, ttl, readOnly) + } + } + } catch { + case e: RocksDBException => + logger.error(s"initialize rocks db storage failed.", e) + throw e + } + } +} + +class RocksStorage(override val graph: S2GraphLike, + override val config: Config) extends Storage(graph, config) { + + import RocksStorage._ + var closed = false + + lazy val (vdb, db) = getOrElseUpdate(config) + + val cacheLoader = new CacheLoader[String, ReentrantLock] { + override def load(key: String) = new ReentrantLock() + } + + val lockMap: LoadingCache[String, ReentrantLock] = CacheBuilder.newBuilder() + .concurrencyLevel(8) + .expireAfterAccess(graph.LockExpireDuration, TimeUnit.MILLISECONDS) + .expireAfterWrite(graph.LockExpireDuration, TimeUnit.MILLISECONDS) + .maximumSize(1000 * 10 * 10 * 10 * 10) + .build[String, ReentrantLock](cacheLoader) + + override val management = new RocksStorageManagement(config, vdb, db) + + override val mutator = new RocksStorageWritable(db, vdb, lockMap) + + override val serDe = new RocksStorageSerDe(graph) + + override val fetcher = new RocksStorageReadable(graph, config, db, vdb, serDe, io) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageManagement.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageManagement.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageManagement.scala new file mode 100644 index 0000000..3b9d4f8 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageManagement.scala @@ -0,0 +1,26 @@ +package org.apache.s2graph.core.storage.rocks + +import com.typesafe.config.Config +import org.apache.s2graph.core.storage.StorageManagement +import org.rocksdb.RocksDB + +class RocksStorageManagement(val config: Config, + val vdb: RocksDB, + val db: RocksDB) extends StorageManagement { + val path =RocksStorage.getFilePath(config) + + + override def flush(): Unit = { + vdb.close() + db.close() + RocksStorage.dbPool.asMap().remove(path) + } + + override def createTable(config: Config, tableNameStr: String): Unit = {} + + override def truncateTable(config: Config, tableNameStr: String): Unit = {} + + override def deleteTable(config: Config, tableNameStr: String): Unit = {} + + override def shutdown(): Unit = flush() +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala new file mode 100644 index 0000000..194b3d2 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala @@ -0,0 +1,189 @@ +package org.apache.s2graph.core.storage.rocks + +import java.util.Base64 + +import com.typesafe.config.Config +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls.{Label, ServiceColumn} +import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange} +import org.apache.s2graph.core.storage.serde.StorageSerializable +import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, StorageSerDe} +import org.apache.s2graph.core.types.{HBaseType, VertexId} +import org.rocksdb.RocksDB + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{ExecutionContext, Future} + +class RocksStorageReadable(val graph: S2GraphLike, + val config: Config, + val db: RocksDB, + val vdb: RocksDB, + val serDe: StorageSerDe, + override val io: StorageIO) extends StorageReadable { + + private val table = Array.emptyByteArray + private val qualifier = Array.emptyByteArray + + private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike): RocksRPC = { + queryRequest.queryParam.tgtVertexInnerIdOpt match { + case None => // indexEdges + val queryParam = queryRequest.queryParam + val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption + val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam")) + val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes + val labelWithDirBytes = indexEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) + + val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + + val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) + val (startKey, stopKey) = + if (queryParam.intervalOpt.isDefined) { + val _startKey = queryParam.cursorOpt match { + case Some(cursor) => Base64.getDecoder.decode(cursor) + case None => Bytes.add(baseKey, intervalMaxBytes) + } + (_startKey, Bytes.add(baseKey, intervalMinBytes)) + } else { + val _startKey = queryParam.cursorOpt match { + case Some(cursor) => Base64.getDecoder.decode(cursor) + case None => baseKey + } + (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) + } + + Right(ScanWithRange(queryParam, startKey, stopKey, queryParam.offset, queryParam.limit)) + + case Some(tgtId) => // snapshotEdge + val kv = serDe.snapshotEdgeSerializer(graph.elementBuilder.toRequestEdge(queryRequest, Nil).toSnapshotEdge).toKeyValues.head + Left(GetRequest(SKeyValue.EdgeCf, kv.row)) + } + } + + private def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike): RocksRPC = { + val kv = serDe.vertexSerializer(vertex).toKeyValues.head + Left(GetRequest(SKeyValue.VertexCf, kv.row)) + } + + override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { + val futures = for { + queryRequest <- queryRequests + } yield { + val parentEdges = prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) + val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges) + val rpc = buildRequest(queryRequest, edge) + fetchKeyValues(rpc).map { kvs => + val queryParam = queryRequest.queryParam + val stepResult = io.toEdges(kvs, queryRequest, queryRequest.prevStepScore, false, parentEdges) + val edgeWithScores = stepResult.edgeWithScores.filter { case edgeWithScore => + val edge = edgeWithScore.edge + val duration = queryParam.durationOpt.getOrElse((Long.MinValue, Long.MaxValue)) + edge.ts >= duration._1 && edge.ts < duration._2 + } + + stepResult.copy(edgeWithScores = edgeWithScores) + } + } + + Future.sequence(futures) + } + + private def fetchKeyValues(rpc: RocksRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = { + rpc match { + case Left(GetRequest(cf, key)) => + val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db + val v = _db.get(key) + + val kvs = + if (v == null) Seq.empty + else Seq(SKeyValue(table, key, cf, qualifier, v, System.currentTimeMillis())) + + Future.successful(kvs) + case Right(ScanWithRange(queryParam, startKey, stopKey, offset, limit)) => + val kvs = new ArrayBuffer[SKeyValue]() + val iter = db.newIterator() + try { + var idx = 0 + iter.seek(startKey) + val (startOffset, len) = (queryParam.innerOffset, queryParam.innerLimit) + while (iter.isValid && Bytes.compareTo(iter.key, stopKey) <= 0 && idx < startOffset + len) { + if (idx >= startOffset) { + kvs += SKeyValue(table, iter.key, SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis()) + } + + iter.next() + idx += 1 + } + } finally { + iter.close() + } + + Future.successful(kvs) + } + } + + override def fetchEdgesAll()(implicit ec: ExecutionContext) = { + val edges = new ArrayBuffer[S2EdgeLike]() + Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) => + val distinctLabels = labels.toSet + + val iter = db.newIterator() + try { + iter.seekToFirst() + while (iter.isValid) { + val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis()) + + serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None) + .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree) + .foreach { edge => + edges += edge + } + + + iter.next() + } + + } finally { + iter.close() + } + } + + Future.successful(edges) + } + + override def fetchVerticesAll()(implicit ec: ExecutionContext) = { + val vertices = new ArrayBuffer[S2VertexLike]() + ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) => + val distinctColumns = columns.toSet + + val iter = vdb.newIterator() + try { + iter.seekToFirst() + while (iter.isValid) { + val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis()) + + serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None) + .filter(v => distinctColumns(v.serviceColumn)) + .foreach { vertex => + vertices += vertex + } + + iter.next() + } + } finally { + iter.close() + } + } + + Future.successful(vertices) + } + + override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = { + fetchKeyValues(buildRequest(queryRequest, edge)) + } + + override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext) = { + fetchKeyValues(buildRequest(queryRequest, vertex)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageSerDe.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageSerDe.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageSerDe.scala new file mode 100644 index 0000000..833c857 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageSerDe.scala @@ -0,0 +1,33 @@ +package org.apache.s2graph.core.storage.rocks + +import org.apache.s2graph.core._ +import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe, serde} +import org.apache.s2graph.core.types.HBaseType + +class RocksStorageSerDe(val graph: S2GraphLike) extends StorageSerDe { + + override def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = + new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge) + + override def indexEdgeSerializer(indexEdge: IndexEdge) = + new serde.indexedge.tall.IndexEdgeSerializable(indexEdge, RocksHelper.longToBytes) + + override def vertexSerializer(vertex: S2VertexLike) = + new serde.vertex.tall.VertexSerializable(vertex, RocksHelper.intToBytes) + + + private val snapshotEdgeDeserializer = new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph) + override def snapshotEdgeDeserializer(schemaVer: String) = snapshotEdgeDeserializer + + private val indexEdgeDeserializable = + new serde.indexedge.tall.IndexEdgeDeserializable(graph, + RocksHelper.bytesToLong, tallSchemaVersions = HBaseType.ValidVersions.toSet) + + override def indexEdgeDeserializer(schemaVer: String) = indexEdgeDeserializable + + private val vertexDeserializer = + new serde.vertex.tall.VertexDeserializable(graph) + + override def vertexDeserializer(schemaVer: String) = vertexDeserializer + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala new file mode 100644 index 0000000..1057781 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala @@ -0,0 +1,110 @@ +package org.apache.s2graph.core.storage.rocks + +import java.util.concurrent.locks.ReentrantLock + +import com.google.common.cache.{Cache, LoadingCache} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue, StorageWritable} +import org.apache.s2graph.core.utils.logger +import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions} + +import scala.concurrent.{ExecutionContext, Future} + +class RocksStorageWritable(val db: RocksDB, + val vdb: RocksDB, + val lockMap: LoadingCache[String, ReentrantLock]) extends StorageWritable { + + override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = { + if (kvs.isEmpty) { + Future.successful(MutateResponse.Success) + } else { + val ret = { + val (kvsV, kvsE) = kvs.partition(kv => Bytes.equals(kv.cf, SKeyValue.VertexCf)) + val writeBatchV = buildWriteBatch(kvsV) + val writeBatchE = buildWriteBatch(kvsE) + val writeOptions = new WriteOptions + try { + vdb.write(writeOptions, writeBatchV) + db.write(writeOptions, writeBatchE) + true + } catch { + case e: Exception => + logger.error(s"writeAsyncSimple failed.", e) + false + } finally { + writeBatchV.close() + writeBatchE.close() + writeOptions.close() + } + } + + Future.successful(new MutateResponse(ret)) + } + } + + + override def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext) = { + def op = { + val writeOptions = new WriteOptions + try { + val fetchedValue = db.get(requestKeyValue.row) + val innerRet = expectedOpt match { + case None => + if (fetchedValue == null) { + + db.put(writeOptions, requestKeyValue.row, requestKeyValue.value) + true + } else { + false + } + case Some(kv) => + if (fetchedValue == null) { + false + } else { + if (Bytes.compareTo(fetchedValue, kv.value) == 0) { + db.put(writeOptions, requestKeyValue.row, requestKeyValue.value) + true + } else { + false + } + } + } + + Future.successful(new MutateResponse(innerRet)) + } catch { + case e: RocksDBException => + logger.error(s"Write lock failed", e) + Future.successful(MutateResponse.Failure) + } finally { + writeOptions.close() + } + } + + withLock(requestKeyValue.row)(op) + } + + private def buildWriteBatch(kvs: Seq[SKeyValue]): WriteBatch = { + val writeBatch = new WriteBatch() + kvs.foreach { kv => + kv.operation match { + case SKeyValue.Put => writeBatch.put(kv.row, kv.value) + case SKeyValue.Delete => writeBatch.remove(kv.row) + case SKeyValue.Increment => writeBatch.merge(kv.row, kv.value) + case _ => throw new RuntimeException(s"not supported rpc operation. ${kv.operation}") + } + } + writeBatch + } + + private def withLock[A](key: Array[Byte])(op: => A): A = { + val lockKey = Bytes.toString(key) + val lock = lockMap.get(lockKey) + + try { + lock.lock + op + } finally { + lock.unlock() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4ea9a8e7/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala new file mode 100644 index 0000000..0f36b9d --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala @@ -0,0 +1,33 @@ +package org.apache.s2graph.core.storage.rocks + +import org.apache.s2graph.core.TestCommonWithModels +import org.apache.s2graph.core.mysqls.{Service, ServiceColumn} +import org.apache.tinkerpop.gremlin.structure.T +import org.scalatest.{FunSuite, Matchers} + +import scala.collection.JavaConversions._ + +class RocksStorageTest extends FunSuite with Matchers with TestCommonWithModels { + initTests() + +// test("VertexTest: shouldNotGetConcurrentModificationException()") { +// val service = Service.findByName(serviceName, useCache = false).getOrElse { +// throw new IllegalStateException("service not found.") +// } +// val column = ServiceColumn.find(service.id.get, columnName).getOrElse { +// throw new IllegalStateException("column not found.") +// } +// +// val vertexId = graph.elementBuilder.newVertexId(service, column, 1L) +// +// val vertex = graph.elementBuilder.newVertex(vertexId) +// for (i <- (0 until 10)) { +// vertex.addEdge(labelName, vertex) +// } +// +// println(graph.edges().toSeq) +// println("*" * 100) +// vertex.remove() +// println(graph.vertices().toSeq) +// } +}
