http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala index 69926fa..d2a7de7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala @@ -21,6 +21,7 @@ package org.apache.s2graph.core.storage import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core.QueryParam +import org.apache.s2graph.core.mysqls.{LabelMeta, Label} import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, InnerValLikeWithTs} import org.apache.s2graph.core.utils.logger @@ -36,16 +37,17 @@ object StorageDeserializable { def bytesToKeyValues(bytes: Array[Byte], offset: Int, length: Int, - version: String): (Array[(Byte, InnerValLike)], Int) = { + schemaVer: String, + label: Label): (Array[(LabelMeta, InnerValLike)], Int) = { var pos = offset val len = bytes(pos) pos += 1 - val kvs = new Array[(Byte, InnerValLike)](len) + val kvs = new Array[(LabelMeta, InnerValLike)](len) var i = 0 while (i < len) { - val k = bytes(pos) + val k = label.labelMetaMap(bytes(pos)) pos += 1 - val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version) + val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer) pos += numOfBytesUsed kvs(i) = (k -> v) i += 1 @@ -57,16 +59,17 @@ object StorageDeserializable { def bytesToKeyValuesWithTs(bytes: Array[Byte], offset: Int, - version: String): (Array[(Byte, InnerValLikeWithTs)], Int) = { + schemaVer: String, + label: Label): (Array[(LabelMeta, InnerValLikeWithTs)], Int) = { var pos = offset val len = bytes(pos) pos += 1 - val kvs = new Array[(Byte, InnerValLikeWithTs)](len) + val kvs = new Array[(LabelMeta, InnerValLikeWithTs)](len) var i = 0 while (i < len) { - val k = bytes(pos) + val k = label.labelMetaMap(bytes(pos)) pos += 1 - val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, version) + val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, schemaVer) pos += numOfBytesUsed kvs(i) = (k -> v) i += 1 @@ -78,15 +81,15 @@ object StorageDeserializable { def bytesToProps(bytes: Array[Byte], offset: Int, - version: String): (Array[(Byte, InnerValLike)], Int) = { + schemaVer: String): (Array[(LabelMeta, InnerValLike)], Int) = { var pos = offset val len = bytes(pos) pos += 1 - val kvs = new Array[(Byte, InnerValLike)](len) + val kvs = new Array[(LabelMeta, InnerValLike)](len) var i = 0 while (i < len) { - val k = HBaseType.EMPTY_SEQ_BYTE - val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version) + val k = LabelMeta.empty + val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer) pos += numOfBytesUsed kvs(i) = (k -> v) i += 1 @@ -98,17 +101,19 @@ object StorageDeserializable { } def bytesToLong(bytes: Array[Byte], offset: Int): Long = Bytes.toLong(bytes, offset) + + def bytesToInt(bytes: Array[Byte], offset: Int): Int = Bytes.toInt(bytes, offset) } trait StorageDeserializable[E] { - def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): Option[E] = { + def fromKeyValues[T: CanSKeyValue](checkLabel: Option[Label], kvs: Seq[T], version: String, cacheElementOpt: Option[E]): Option[E] = { try { - Option(fromKeyValuesInner(queryParam, kvs, version, cacheElementOpt)) + Option(fromKeyValuesInner(checkLabel, kvs, version, cacheElementOpt)) } catch { case e: Exception => logger.error(s"${this.getClass.getName} fromKeyValues failed.", e) None } } - def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): E + def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], kvs: Seq[T], version: String, cacheElementOpt: Option[E]): E }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala index b7326f5..c1efe7b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala @@ -20,31 +20,33 @@ package org.apache.s2graph.core.storage import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.LabelMeta import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs} +import org.apache.s2graph.core.utils.logger object StorageSerializable { /** serializer */ - def propsToBytes(props: Seq[(Byte, InnerValLike)]): Array[Byte] = { + def propsToBytes(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = { val len = props.length assert(len < Byte.MaxValue) var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, v.bytes) + for ((_, v) <- props) bytes = Bytes.add(bytes, v.bytes) bytes } - def propsToKeyValues(props: Seq[(Byte, InnerValLike)]): Array[Byte] = { + def propsToKeyValues(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = { val len = props.length assert(len < Byte.MaxValue) var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes) + for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes) bytes } - def propsToKeyValuesWithTs(props: Seq[(Byte, InnerValLikeWithTs)]): Array[Byte] = { + def propsToKeyValuesWithTs(props: Seq[(LabelMeta, InnerValLikeWithTs)]): Array[Byte] = { val len = props.length assert(len < Byte.MaxValue) var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes) + for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes) bytes } @@ -53,13 +55,17 @@ object StorageSerializable { val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0) Array.fill(1)(byte.toByte) } + + def intToBytes(value: Int): Array[Byte] = Bytes.toBytes(value) + + def longToBytes(value: Long): Array[Byte] = Bytes.toBytes(value) } trait StorageSerializable[E] { val cf = Serializable.edgeCf - val table: Array[Byte] - val ts: Long + def table: Array[Byte] + def ts: Long def toRowKey: Array[Byte] def toQualifier: Array[Byte] @@ -70,7 +76,7 @@ trait StorageSerializable[E] { val qualifier = toQualifier val value = toValue val kv = SKeyValue(table, row, cf, qualifier, value, ts) - +// logger.debug(s"[SER]: ${kv.toLogString}}") Seq(kv) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index b52ba53..e63dfea 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -19,11 +19,13 @@ package org.apache.s2graph.core.storage.hbase + + import java.util import java.util.Base64 -import com.stumbleupon.async.Deferred -import com.typesafe.config.{Config, ConfigFactory} +import com.stumbleupon.async.{Callback, Deferred} +import com.typesafe.config.Config import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability} import org.apache.hadoop.hbase.io.compress.Compression.Algorithm @@ -35,15 +37,17 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.s2graph.core._ import org.apache.s2graph.core.mysqls.LabelMeta import org.apache.s2graph.core.storage._ +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange} import org.apache.s2graph.core.types.{HBaseType, VertexId} import org.apache.s2graph.core.utils._ import org.hbase.async.FilterList.Operator.MUST_PASS_ALL import org.hbase.async._ import scala.collection.JavaConversions._ -import scala.collection.{Map, Seq} -import scala.concurrent.duration.Duration +import scala.collection.mutable.ArrayBuffer import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.util.Try import scala.util.hashing.MurmurHash3 @@ -79,12 +83,15 @@ object AsynchbaseStorage { logger.info(s"Asynchbase: ${client.getConfig.dumpConfiguration()}") client } + + case class ScanWithRange(scan: Scanner, offset: Int, limit: Int) + type AsyncRPC = Either[GetRequest, ScanWithRange] } class AsynchbaseStorage(override val graph: Graph, override val config: Config)(implicit ec: ExecutionContext) - extends Storage[Deferred[StepInnerResult]](graph, config) { + extends Storage[AsyncRPC, Deferred[StepResult]](graph, config) { import Extensions.DeferOps @@ -92,26 +99,41 @@ class AsynchbaseStorage(override val graph: Graph, * Asynchbase client setup. * note that we need two client, one for bulk(withWait=false) and another for withWait=true */ - val configWithFlush = config.withFallback(ConfigFactory.parseMap(Map("hbase.rpcs.buffered_flush_interval" -> "0"))) - val client = AsynchbaseStorage.makeClient(config) - - private val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0") - private val clients = Seq(client, clientWithFlush) private val clientFlushInterval = config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort + + /** + * since some runtime environment such as spark cluster has issue with guava version, that is used in Asynchbase. + * to fix version conflict, make this as lazy val for clients that don't require hbase client. + */ + lazy val client = AsynchbaseStorage.makeClient(config) + lazy val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0") + lazy val clients = Seq(client, clientWithFlush) + private val emptyKeyValues = new util.ArrayList[KeyValue]() + private val emptyStepResult = new util.ArrayList[StepResult]() + private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client + import CanDefer._ + /** Future Cache to squash request */ - private val futureCache = new DeferCache[StepInnerResult, Deferred, Deferred](config, StepInnerResult.Empty, "FutureCache", useMetric = true) + lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true) /** Simple Vertex Cache */ - private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue]) + lazy private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue]) private val zkQuorum = config.getString("hbase.zookeeper.quorum") private val zkQuorumSlave = - if (config.hasPath("hbase.zookeeper.quorum")) Option(config.getString("hbase.zookeeper.quorum")) + if (config.hasPath("hbase.slave.zookeeper.quorum")) Option(config.getString("hbase.slave.zookeeper.quorum")) else None + /** v4 max next row size */ + private val v4_max_num_rows = 10000 + private def getV4MaxNumRows(limit : Int): Int = { + if (limit < v4_max_num_rows) limit + else v4_max_num_rows + } + /** * fire rpcs into proper hbase cluster using client and * return true on all mutation success. otherwise return false. @@ -120,37 +142,75 @@ class AsynchbaseStorage(override val graph: Graph, if (kvs.isEmpty) Future.successful(true) else { val _client = client(withWait) - val futures = kvs.map { kv => - val _defer = kv.operation match { - case SKeyValue.Put => _client.put(new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp)) - case SKeyValue.Delete => - if (kv.qualifier == null) _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, kv.timestamp)) - else _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.timestamp)) - case SKeyValue.Increment => - _client.atomicIncrement(new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))) - } - val future = _defer.withCallback { ret => true }.recoverWith { ex => + val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment) + + /** Asynchbase IncrementRequest does not implement HasQualifiers */ + val incrementsFutures = increments.map { kv => + val inc = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)) + val defer = _client.atomicIncrement(inc) + val future = defer.toFuture(Long.box(0)).map(_ => true).recover { case ex: Exception => logger.error(s"mutation failed. $kv", ex) false - }.toFuture - + } if (withWait) future else Future.successful(true) } - Future.sequence(futures).map(_.forall(identity)) + /** PutRequest and DeleteRequest accept byte[][] qualifiers/values. */ + val othersFutures = putAndDeletes.groupBy { kv => + (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp) + }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) => + + val durability = groupedKeyValues.head.durability + val qualifiers = new ArrayBuffer[Array[Byte]]() + val values = new ArrayBuffer[Array[Byte]]() + + groupedKeyValues.foreach { kv => + if (kv.qualifier != null) qualifiers += kv.qualifier + if (kv.value != null) values += kv.value + } + val defer = operation match { + case SKeyValue.Put => + val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp) + put.setDurable(durability) + _client.put(put) + case SKeyValue.Delete => + val delete = + if (qualifiers.isEmpty) + new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp) + else + new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp) + delete.setDurable(durability) + _client.delete(delete) + } + if (withWait) { + defer.toFuture(new AnyRef()).map(_ => true).recover { case ex: Exception => + groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) } + false + } + } else Future.successful(true) + } + for { + incrementRets <- Future.sequence(incrementsFutures) + otherRets <- Future.sequence(othersFutures) + } yield (incrementRets ++ otherRets).forall(identity) } } - - override def fetchSnapshotEdgeKeyValues(hbaseRpc: AnyRef): Future[Seq[SKeyValue]] = { - val defer = fetchKeyValuesInner(hbaseRpc) - defer.toFuture.map { kvsArr => + private def fetchKeyValues(rpc: AsyncRPC): Future[Seq[SKeyValue]] = { + val defer = fetchKeyValuesInner(rpc) + defer.toFuture(emptyKeyValues).map { kvsArr => kvsArr.map { kv => implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) - } toSeq + } } } + override def fetchSnapshotEdgeKeyValues(queryRequest: QueryRequest): Future[Seq[SKeyValue]] = { + val edge = toRequestEdge(queryRequest, Nil) + val rpc = buildRequest(queryRequest, edge) + fetchKeyValues(rpc) + } + /** * since HBase natively provide CheckAndSet on storage level, implementation becomes simple. * @param rpc: key value that is need to be stored on storage. @@ -162,7 +222,7 @@ class AsynchbaseStorage(override val graph: Graph, override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean] = { val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp) val expected = expectedOpt.map(_.value).getOrElse(Array.empty) - client(withWait = true).compareAndSet(put, expected).withCallback(ret => ret.booleanValue()).toFuture + client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true) } @@ -182,24 +242,24 @@ class AsynchbaseStorage(override val graph: Graph, * @param queryRequest * @return Scanner or GetRequest with proper setup with StartKey, EndKey, RangeFilter. */ - override def buildRequest(queryRequest: QueryRequest): AnyRef = { + override def buildRequest(queryRequest: QueryRequest, edge: Edge): AsyncRPC = { import Serializable._ val queryParam = queryRequest.queryParam val label = queryParam.label - val edge = toRequestEdge(queryRequest) val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) { val snapshotEdge = edge.toSnapshotEdge snapshotEdgeSerializer(snapshotEdge) } else { - val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.labelWithDir, + val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.label, edge.dir, edge.op, edge.version, queryParam.labelOrderSeq, edge.propsWithTs) indexEdgeSerializer(indexEdge) } - val (rowKey, qualifier) = (serializer.toRowKey, serializer.toQualifier) + val rowKey = serializer.toRowKey + val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue)) - val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue)) + val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) label.schemaVersion match { case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty => @@ -215,28 +275,26 @@ class AsynchbaseStorage(override val graph: Graph, val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes val labelWithDirBytes = indexEdge.labelWithDir.bytes val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - // val labelIndexSeqWithIsInvertedStopBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = true) - val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, Bytes.add(labelIndexSeqWithIsInvertedBytes, Array.fill(1)(edge.op))) + val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + val (startKey, stopKey) = - if (queryParam.columnRangeFilter != null) { + if (queryParam.intervalOpt.isDefined) { // interval is set. val _startKey = queryParam.cursorOpt match { - case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), Array.fill(1)(0)) - case None => Bytes.add(baseKey, queryParam.columnRangeFilterMinBytes) + case Some(cursor) => Base64.getDecoder.decode(cursor) + case None => Bytes.add(baseKey, intervalMaxBytes) } - (_startKey, Bytes.add(baseKey, queryParam.columnRangeFilterMaxBytes)) + (_startKey , Bytes.add(baseKey, intervalMinBytes)) } else { - /* - * note: since propsToBytes encode size of property map at first byte, we are sure about max value here - */ + /** + * note: since propsToBytes encode size of property map at first byte, we are sure about max value here + */ val _startKey = queryParam.cursorOpt match { - case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), Array.fill(1)(0)) + case Some(cursor) => Base64.getDecoder.decode(cursor) case None => baseKey } (_startKey, Bytes.add(baseKey, Array.fill(1)(-1))) } -// logger.debug(s"[StartKey]: ${startKey.toList}") -// logger.debug(s"[StopKey]: ${stopKey.toList}") scanner.setStartKey(startKey) scanner.setStopKey(stopKey) @@ -244,15 +302,23 @@ class AsynchbaseStorage(override val graph: Graph, if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam") scanner.setMaxVersions(1) - scanner.setMaxNumRows(queryParam.offset + queryParam.limit) + // TODO: exclusive condition innerOffset with cursorOpt + if (queryParam.cursorOpt.isDefined) { + scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit)) + } else { + scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + queryParam.innerLimit)) + } scanner.setMaxTimestamp(maxTs) scanner.setMinTimestamp(minTs) - scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis) + scanner.setRpcTimeout(queryParam.rpcTimeout) + // SET option for this rpc properly. - scanner + if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit)) + else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit)) + case _ => val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) { - new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier) + new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, serializer.toQualifier) } else { new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf) } @@ -261,12 +327,14 @@ class AsynchbaseStorage(override val graph: Graph, get.setFailfast(true) get.setMinTimestamp(minTs) get.setMaxTimestamp(maxTs) - get.setTimeout(queryParam.rpcTimeoutInMillis) + get.setTimeout(queryParam.rpcTimeout) val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset) - get.setFilter(new FilterList(pagination +: Option(queryParam.columnRangeFilter).toSeq, MUST_PASS_ALL)) - - get + val columnRangeFilterOpt = queryParam.intervalOpt.map { interval => + new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true) + } + get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL)) + Left(get) } } @@ -274,81 +342,81 @@ class AsynchbaseStorage(override val graph: Graph, * we are using future cache to squash requests into same key on storage. * * @param queryRequest - * @param prevStepScore * @param isInnerCall * @param parentEdges * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future. * seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback */ override def fetch(queryRequest: QueryRequest, - prevStepScore: Double, isInnerCall: Boolean, - parentEdges: Seq[EdgeWithScore]): Deferred[StepInnerResult] = { + parentEdges: Seq[EdgeWithScore]): Deferred[StepResult] = { - def fetchInner(hbaseRpc: AnyRef): Deferred[StepInnerResult] = { - val queryParam = queryRequest.queryParam + def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = { + val prevStepScore = queryRequest.prevStepScore + val fallbackFn: (Exception => StepResult) = { ex => + logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) + StepResult.Failure + } - fetchKeyValuesInner(hbaseRpc).withCallback { kvs => - val (startOffset, length) = queryParam.label.schemaVersion match { - case HBaseType.VERSION4 => (queryParam.offset, queryParam.limit) + val queryParam = queryRequest.queryParam + fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs => + val (startOffset, len) = queryParam.label.schemaVersion match { + case HBaseType.VERSION4 => + val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset + (offset, queryParam.limit) case _ => (0, kvs.length) } - val edgeWithScores = toEdges(kvs, queryParam, prevStepScore, isInnerCall, parentEdges, startOffset, length) - if (edgeWithScores.isEmpty) StepInnerResult.Empty - else { - val head = edgeWithScores.head - val (degreeEdges, indexEdges) = - if (head.edge.isDegree) (Seq(head), edgeWithScores.tail) - else (Nil, edgeWithScores) - - val normalized = - if (queryRequest.queryParam.shouldNormalize) normalize(indexEdges) - else indexEdges - - val sampled = if (queryRequest.queryParam.sample >= 0) { - sample(queryRequest, normalized, queryRequest.queryParam.sample) - } else normalized - - StepInnerResult(edgesWithScoreLs = sampled, degreeEdges) - } - } recoverWith { ex => - logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex) - StepInnerResult.Failure + toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len) } } val queryParam = queryRequest.queryParam val cacheTTL = queryParam.cacheTTLInMillis - val request = buildRequest(queryRequest) + /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ + + val edge = toRequestEdge(queryRequest, parentEdges) + val request = buildRequest(queryRequest, edge) + val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) + val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes) if (cacheTTL <= 0) fetchInner(request) else { - val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request)) + val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey) + +// val cacheKeyBytes = toCacheKeyBytes(request) val cacheKey = queryParam.toCacheKey(cacheKeyBytes) futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request)) } } + override def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = { + val defers: Seq[Deferred[StepResult]] = for { + queryRequest <- queryRequests + } yield { + val queryOption = queryRequest.query.queryOption + val queryParam = queryRequest.queryParam + val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent + val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil + fetch(queryRequest, isInnerCall = false, parentEdges) + } - override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, Double)], - prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[StepInnerResult]] = { - val defers: Seq[Deferred[StepInnerResult]] = for { - (queryRequest, prevStepScore) <- queryRequestWithScoreLs - parentEdges <- prevStepEdges.get(queryRequest.vertex.id) - } yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges) - - val grouped: Deferred[util.ArrayList[StepInnerResult]] = Deferred.group(defers) - grouped withCallback { - queryResults: util.ArrayList[StepInnerResult] => - queryResults.toIndexedSeq - } toFuture + val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers) + grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] => + queryResults.toSeq + }.toFuture(emptyStepResult) } - def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] = fetchSnapshotEdgeKeyValues(request) + def fetchVertexKeyValues(request: QueryRequest): Future[Seq[SKeyValue]] = { + val edge = toRequestEdge(request, Nil) + fetchKeyValues(buildRequest(request, edge)) + } + def fetchVertexKeyValues(request: AsyncRPC): Future[Seq[SKeyValue]] = fetchKeyValues(request) + /** * when withWait is given, we use client with flushInterval set to 0. * if we are not using this, then we are adding extra wait time as much as flushInterval in worst case. @@ -357,35 +425,44 @@ class AsynchbaseStorage(override val graph: Graph, * @param withWait * @return */ - override def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = { + override def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = { + val _client = client(withWait) - val defers: Seq[Deferred[(Boolean, Long)]] = for { + val defers: Seq[Deferred[(Boolean, Long, Long)]] = for { edge <- edges } yield { - val edgeWithIndex = edge.edgesWithIndex.head - val countWithTs = edge.propsWithTs(LabelMeta.countSeq) - val countVal = countWithTs.innerVal.toString().toLong - val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head - val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)) - val defer = _client.bufferAtomicIncrement(request) withCallback { resultCount: java.lang.Long => - (true, resultCount.longValue()) - } recoverWith { ex => - logger.error(s"mutation failed. $request", ex) - (false, -1L) + val futures: List[Deferred[(Boolean, Long, Long)]] = for { + relEdge <- edge.relatedEdges + edgeWithIndex <- relEdge.edgesWithIndexValid + } yield { + val countWithTs = edge.propsWithTs(LabelMeta.count) + val countVal = countWithTs.innerVal.toString().toLong + val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head + val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)) + val fallbackFn: (Exception => (Boolean, Long, Long)) = { ex => + logger.error(s"mutation failed. $request", ex) + (false, -1L, -1L) + } + val defer = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long => + (true, resultCount.longValue(), countVal) + } + if (withWait) defer + else Deferred.fromResult((true, -1L, -1L)) } - if (withWait) defer - else Deferred.fromResult((true, -1L)) + + val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = Deferred.group(futures) + grouped.map(new util.ArrayList[(Boolean, Long, Long)]()) { resultLs => resultLs.head } } - val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = Deferred.groupInOrder(defers) - grouped.toFuture.map(_.toSeq) + val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = Deferred.groupInOrder(defers) + grouped.toFuture(new util.ArrayList[(Boolean, Long, Long)]()).map(_.toSeq) } override def flush(): Unit = clients.foreach { client => super.flush() val timeout = Duration((clientFlushInterval + 10) * 20, duration.MILLISECONDS) - Await.result(client.flush().toFuture, timeout) + Await.result(client.flush().toFuture(new AnyRef), timeout) } @@ -394,40 +471,37 @@ class AsynchbaseStorage(override val graph: Graph, cfs: List[String], regionMultiplier: Int, ttl: Option[Int], - compressionAlgorithm: String): Unit = { + compressionAlgorithm: String, + replicationScopeOpt: Option[Int] = None, + totalRegionCount: Option[Int] = None): Unit = { /** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */ for { zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq } { logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") val admin = getAdmin(zkAddr) - val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier + val regionCount = totalRegionCount.getOrElse(admin.getClusterStatus.getServersSize * regionMultiplier) try { if (!admin.tableExists(TableName.valueOf(tableName))) { - try { - val desc = new HTableDescriptor(TableName.valueOf(tableName)) - desc.setDurability(Durability.ASYNC_WAL) - for (cf <- cfs) { - val columnDesc = new HColumnDescriptor(cf) - .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) - .setBloomFilterType(BloomType.ROW) - .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) - .setMaxVersions(1) - .setTimeToLive(2147483647) - .setMinVersions(0) - .setBlocksize(32768) - .setBlockCacheEnabled(true) - if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) - desc.addFamily(columnDesc) - } - - if (regionCount <= 1) admin.createTable(desc) - else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) - } catch { - case e: Throwable => - logger.error(s"$zkAddr, $tableName failed with $e", e) - throw e + val desc = new HTableDescriptor(TableName.valueOf(tableName)) + desc.setDurability(Durability.ASYNC_WAL) + for (cf <- cfs) { + val columnDesc = new HColumnDescriptor(cf) + .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase)) + .setBloomFilterType(BloomType.ROW) + .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) + .setMaxVersions(1) + .setTimeToLive(2147483647) + .setMinVersions(0) + .setBlocksize(32768) + .setBlockCacheEnabled(true) + if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) + if (replicationScopeOpt.isDefined) columnDesc.setScope(replicationScopeOpt.get) + desc.addFamily(columnDesc) } + + if (regionCount <= 1) admin.createTable(desc) + else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount) } else { logger.info(s"$zkAddr, $tableName, $cfs already exist.") } @@ -445,12 +519,12 @@ class AsynchbaseStorage(override val graph: Graph, /** Asynchbase implementation override default getVertices to use future Cache */ override def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = { - def fromResult(queryParam: QueryParam, - kvs: Seq[SKeyValue], + def fromResult(kvs: Seq[SKeyValue], version: String): Option[Vertex] = { if (kvs.isEmpty) None - else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None) + else vertexDeserializer.fromKeyValues(None, kvs, version, None) +// .map(S2Vertex(graph, _)) } val futures = vertices.map { vertex => @@ -461,55 +535,85 @@ class AsynchbaseStorage(override val graph: Graph, get.maxVersions(1) val cacheKey = MurmurHash3.stringHash(get.toString) - vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 10000)(fetchVertexKeyValues(get)).map { kvs => - fromResult(QueryParam.Empty, kvs, vertex.serviceColumn.schemaVersion) + vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 10000)(fetchVertexKeyValues(Left(get))).map { kvs => + fromResult(kvs, vertex.serviceColumn.schemaVersion) } } Future.sequence(futures).map { result => result.toList.flatten } } + class V4ResultHandler(scanner: Scanner, defer: Deferred[util.ArrayList[KeyValue]], offset: Int, limit : Int) extends Callback[Object, util.ArrayList[util.ArrayList[KeyValue]]] { + val results = new util.ArrayList[KeyValue]() + var offsetCount = 0 + override def call(kvsLs: util.ArrayList[util.ArrayList[KeyValue]]): Object = { + try { + if (kvsLs == null) { + defer.callback(results) + Try(scanner.close()) + } else { + val curRet = new util.ArrayList[KeyValue]() + kvsLs.foreach(curRet.addAll(_)) + val prevOffset = offsetCount + offsetCount += curRet.size() + + val nextRet = if(offsetCount > offset){ + if(prevOffset < offset ) { + curRet.subList(offset - prevOffset, curRet.size()) + } else{ + curRet + } + } else{ + emptyKeyValues + } + val needCount = limit - results.size() + if (needCount >= nextRet.size()) { + results.addAll(nextRet) + } else { + results.addAll(nextRet.subList(0, needCount)) + } + if (results.size() < limit) { + scanner.nextRows().addCallback(this) + } else { + defer.callback(results) + Try(scanner.close()) + } + } + } catch{ + case ex: Exception => + logger.error(s"fetchKeyValuesInner failed.", ex) + defer.callback(ex) + Try(scanner.close()) + } + } + } /** * Private Methods which is specific to Asynchbase implementation. */ - private def fetchKeyValuesInner(rpc: AnyRef): Deferred[util.ArrayList[KeyValue]] = { + private def fetchKeyValuesInner(rpc: AsyncRPC): Deferred[util.ArrayList[KeyValue]] = { rpc match { - case getRequest: GetRequest => client.get(getRequest) - case scanner: Scanner => - scanner.nextRows().withCallback { kvsLs => - val ls = new util.ArrayList[KeyValue] - if (kvsLs == null) { - - } else { - kvsLs.foreach { kvs => - if (kvs != null) kvs.foreach { kv => ls.add(kv) } - else { - - } - } - } - scanner.close() - ls - }.recoverWith { ex => - logger.error(s"fetchKeyValuesInner failed.", ex) - scanner.close() - emptyKeyValues - } + case Left(get) => client.get(get) + case Right(ScanWithRange(scanner, offset, limit)) => + val deferred = new Deferred[util.ArrayList[KeyValue]]() + scanner.nextRows().addCallback(new V4ResultHandler(scanner, deferred, offset, limit)) + deferred case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc")) } } - private def toCacheKeyBytes(hbaseRpc: AnyRef): Array[Byte] = { + private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = { + /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ hbaseRpc match { - case getRequest: GetRequest => getRequest.key() - case scanner: Scanner => scanner.getCurrentKey() + case Left(getRequest) => getRequest.key + case Right(ScanWithRange(scanner, offset, limit)) => + Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit))) case _ => logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc") - Array.empty[Byte] + throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc") } } @@ -520,8 +624,6 @@ class AsynchbaseStorage(override val graph: Graph, val principal = config.getString("principal") val keytab = config.getString("keytab") - - System.setProperty("java.security.auth.login.config", jaas) System.setProperty("java.security.krb5.conf", krb5Conf) // System.setProperty("sun.security.krb5.debug", "true") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index e2b7c2f..2428173 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -20,141 +20,125 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.storage.StorageDeserializable._ -import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} +import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable} import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex} import scala.collection.immutable +object IndexEdgeDeserializable{ + def getNewInstance() = new IndexEdgeDeserializable() +} class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { import StorageDeserializable._ - type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) - type ValueRaw = (Array[(Byte, InnerValLike)], Int) + type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) + type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int) - private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { - // val degree = Bytes.toLong(kv.value) - val degree = bytesToLongFunc(kv.value, 0) - val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) - val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) - } + override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], + _kvs: Seq[T], + schemaVer: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + + assert(_kvs.size == 1) + + // val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head) +// logger.debug(s"[DES]: ${kv.toLogString}}") - private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { - var qualifierLen = 0 + val version = kv.timestamp + // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") var pos = 0 - val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { - val (props, endAt) = bytesToProps(kv.qualifier, pos, version) + val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer) + pos += srcIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + pos += 1 + + val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId)) +// val op = kv.row(pos) +// pos += 1 + + if (pos == kv.row.length) { + // degree + // val degreeVal = Bytes.toLong(kv.value) + val degreeVal = bytesToLongFunc(kv.value, 0) + val ts = kv.timestamp + val tsInnerValLikeWithTs = InnerValLikeWithTs.withLong(ts, ts, schemaVer) + val props = Map(LabelMeta.timestamp -> tsInnerValLikeWithTs, + LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer)) + val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) + IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), label, labelWithDir.dir, GraphUtil.defaultOpByte, ts, labelIdxSeq, props, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal)) + } else { + // not degree edge + val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer) pos = endAt - qualifierLen += endAt - val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { + + + val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length - 1) { (HBaseType.defaultTgtVertexId, 0) } else { - TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) + TargetVertexId.fromBytes(kv.row, endAt, kv.row.length - 1, schemaVer) } - qualifierLen += tgtVertexIdLen - (props, endAt, tgtVertexId, tgtVertexIdLen) - } - val (op, opLen) = - if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) - else (kv.qualifier(qualifierLen), 1) + val op = kv.row(kv.row.length-1) + + val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs] + val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ + val size = idxPropsRaw.length + (0 until size).foreach { ith => + val meta = index.sortKeyTypesArray(ith) + val (k, v) = idxPropsRaw(ith) + if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version) + else allProps += meta -> InnerValLikeWithTs(v, version) + } +// for { +// (meta, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw) +// } { +// if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version) +// else { +// allProps += meta -> InnerValLikeWithTs(v, version) +// } +// } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label) + props.foreach { case (k, v) => + allProps += (k -> InnerValLikeWithTs(v, version)) + } + } + val _mergedProps = allProps.result() + val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match { + case None => + val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer) + val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + (mergedProps, tsInnerVal) + case Some(tsInnerVal) => + (_mergedProps, tsInnerVal) + } +// val mergedProps = +// if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps +// else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - qualifierLen += opLen + /** process tgtVertexId */ + val tgtVertexId = + mergedProps.get(LabelMeta.to) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } - (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) - } - private def parseValue(kv: SKeyValue, version: String): ValueRaw = { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - (props, endAt) - } + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal)) - private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { - (Array.empty[(Byte, InnerValLike)], 0) + } } - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - schemaVer: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val version = kv.timestamp - // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") - var pos = 0 - val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer) - pos += srcIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - pos += 1 - - val op = kv.row(pos) - pos += 1 - - if (pos == kv.row.length) { - // degree - // val degreeVal = Bytes.toLong(kv.value) - val degreeVal = bytesToLongFunc(kv.value, 0) - val ts = kv.timestamp - val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer), - LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer)) - val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) - } else { - // not degree edge - - - val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer) - pos = endAt - val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer) - } - - val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - /* process indexProps */ - for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } { - if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) - else allProps += seq -> InnerValLikeWithTs(v, version) - } - - /* process props */ - if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) - } else { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) - props.foreach { case (k, v) => - allProps += (k -> InnerValLikeWithTs(v, version)) - } - } - val _mergedProps = allProps.result() - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - - /* process tgtVertexId */ - val tgtVertexId = - mergedProps.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) - } - - - IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) - - } - } -} + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala index a76bd1f..cd242dc 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -21,16 +21,16 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core.mysqls.LabelMeta -import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import org.apache.s2graph.core.storage.{Serializable, StorageSerializable} import org.apache.s2graph.core.types.VertexId import org.apache.s2graph.core.{GraphUtil, IndexEdge} +import org.apache.s2graph.core.storage.StorageSerializable._ - -class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { +class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byte] = longToBytes) extends Serializable[IndexEdge] { import StorageSerializable._ - override val ts = indexEdge.version - override val table = indexEdge.label.hbaseTableName.getBytes() + override def ts = indexEdge.version + override def table = indexEdge.label.hbaseTableName.getBytes() def idxPropsMap = indexEdge.orders.toMap def idxPropsBytes = propsToBytes(indexEdge.orders) @@ -43,25 +43,25 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") - val qualifier = - if (indexEdge.degreeEdge) Array.empty[Byte] - else - idxPropsMap.get(LabelMeta.toSeq) match { + if (indexEdge.degreeEdge) row + else { + val qualifier = idxPropsMap.get(LabelMeta.to) match { case None => Bytes.add(idxPropsBytes, VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes) case Some(vId) => idxPropsBytes } - /* TODO search usage of op byte. if there is no, then remove opByte */ - Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier) + val opByte = if (indexEdge.op == GraphUtil.operations("incrementCount")) indexEdge.op else GraphUtil.defaultOpByte + Bytes.add(row, qualifier, Array.fill(1)(opByte)) + } } override def toQualifier: Array[Byte] = Array.empty[Byte] override def toValue: Array[Byte] = if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) + longToBytes(indexEdge.props(LabelMeta.degree).innerVal.toString().toLong) else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) + longToBytes(indexEdge.props(LabelMeta.count).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index eb3d765..534667b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -19,122 +19,126 @@ package org.apache.s2graph.core.storage.serde.indexedge.wide -import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} -import scala.collection.immutable - -import scala.collection.immutable - +import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex} import scala.collection.immutable class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { - - - import StorageDeserializable._ - - type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) - type ValueRaw = (Array[(Byte, InnerValLike)], Int) - - private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { - // val degree = Bytes.toLong(kv.value) - val degree = bytesToLongFunc(kv.value, 0) - val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) - val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) - } - - private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { - var qualifierLen = 0 - var pos = 0 - val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { - val (props, endAt) = bytesToProps(kv.qualifier, pos, version) - pos = endAt - qualifierLen += endAt - val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) - } - qualifierLen += tgtVertexIdLen - (props, endAt, tgtVertexId, tgtVertexIdLen) - } - val (op, opLen) = - if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) - else (kv.qualifier(qualifierLen), 1) - - qualifierLen += opLen - - (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) - } - - private def parseValue(kv: SKeyValue, version: String): ValueRaw = { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - (props, endAt) - } - - private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { - (Array.empty[(Byte, InnerValLike)], 0) - } - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - schemaVer: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val version = kv.timestamp - - val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) - }.getOrElse(parseRow(kv, schemaVer)) - - val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = - if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer) - else parseQualifier(kv, schemaVer) - - val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - /* process indexProps */ - for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } { - if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) - else allProps += seq -> InnerValLikeWithTs(v, version) - } - - /* process props */ - if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) - } else if (kv.qualifier.isEmpty) { - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) - } else { - val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) - props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) } - } - - val _mergedProps = allProps.result() - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - - /* process tgtVertexId */ - val tgtVertexId = - mergedProps.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) - } - - IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) - - } -} + import StorageDeserializable._ + + type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) + type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int) + + private def parseDegreeQualifier(kv: SKeyValue, schemaVer: String): QualifierRaw = { + // val degree = Bytes.toLong(kv.value) + val degree = bytesToLongFunc(kv.value, 0) + val idxPropsRaw = Array(LabelMeta.degree -> InnerVal.withLong(degree, schemaVer)) + val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) + (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) + } + + private def parseQualifier(kv: SKeyValue, schemaVer: String): QualifierRaw = { + var qualifierLen = 0 + var pos = 0 + val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { + val (props, endAt) = bytesToProps(kv.qualifier, pos, schemaVer) + pos = endAt + qualifierLen += endAt + val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, schemaVer) + } + qualifierLen += tgtVertexIdLen + (props, endAt, tgtVertexId, tgtVertexIdLen) + } + val (op, opLen) = + if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) + else (kv.qualifier(qualifierLen), 1) + + qualifierLen += opLen + + (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) + } + + override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], + _kvs: Seq[T], + schemaVer: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + assert(_kvs.size == 1) + +// val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head) + val version = kv.timestamp + + val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) + }.getOrElse(parseRow(kv, schemaVer)) + + val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId)) + + val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = + if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer) + else parseQualifier(kv, schemaVer) + + val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs] + val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ + val size = idxPropsRaw.length + (0 until size).foreach { ith => + val meta = index.sortKeyTypesArray(ith) + val (k, v) = idxPropsRaw(ith) + if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version) + else allProps += meta -> InnerValLikeWithTs(v, version) + } +// for { +// (seq, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw) +// } { +// if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version) +// else allProps += seq -> InnerValLikeWithTs(v, version) +// } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else if (kv.qualifier.isEmpty) { + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.degree -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else { + val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label) + props.foreach { case (k, v) => + allProps += (k -> InnerValLikeWithTs(v, version)) + } + } + + val _mergedProps = allProps.result() + val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match { + case None => + val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer) + val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + (mergedProps, tsInnerVal) + case Some(tsInnerVal) => + (_mergedProps, tsInnerVal) + } +// val mergedProps = +// if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps +// else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + + /** process tgtVertexId */ + val tgtVertexId = + mergedProps.get(LabelMeta.to) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } + + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal)) + + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala index c700e53..211b159 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -24,12 +24,13 @@ import org.apache.s2graph.core.mysqls.LabelMeta import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} import org.apache.s2graph.core.types.VertexId import org.apache.s2graph.core.{GraphUtil, IndexEdge} +import org.apache.s2graph.core.storage.StorageSerializable._ -class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { +class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byte] = longToBytes) extends Serializable[IndexEdge] { import StorageSerializable._ - override val ts = indexEdge.version - override val table = indexEdge.label.hbaseTableName.getBytes() + override def ts = indexEdge.version + override def table = indexEdge.label.hbaseTableName.getBytes() def idxPropsMap = indexEdge.orders.toMap def idxPropsBytes = propsToBytes(indexEdge.orders) @@ -49,7 +50,7 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge if (indexEdge.op == GraphUtil.operations("incrementCount")) { Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) } else { - idxPropsMap.get(LabelMeta.toSeq) match { + idxPropsMap.get(LabelMeta.to) match { case None => Bytes.add(idxPropsBytes, tgtIdBytes) case Some(vId) => idxPropsBytes } @@ -59,9 +60,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge override def toValue: Array[Byte] = if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) + longToBytes(indexEdge.props(LabelMeta.degree).innerVal.toString().toLong) else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) + longToBytes(indexEdge.props(LabelMeta.count).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala index 368e3f3..91b8db1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala @@ -20,11 +20,11 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.tall import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta} +import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId} -import org.apache.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} +import org.apache.s2graph.core.{Edge, SnapshotEdge, Vertex} class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { @@ -34,7 +34,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { (statusCode.toByte, op.toByte) } - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { @@ -42,9 +42,10 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { assert(kvs.size == 1) val kv = kvs.head - val schemaVer = queryParam.label.schemaVersion + val label = checkLabel.get + val schemaVer = label.schemaVersion val cellVersion = kv.timestamp - /* rowKey */ + /** rowKey */ def parseRowV3(kv: SKeyValue, version: String) = { var pos = 0 val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version) @@ -64,13 +65,14 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId) val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId) - val (props, op, ts, statusCode, _pendingEdgeOpt) = { + val (props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) = { var pos = 0 val (statusCode, op) = statusCodeWithOp(kv.value(pos)) pos += 1 - val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label) val kvsMap = props.toMap - val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong + val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal + val ts = tsInnerVal.toString.toLong pos = endAt val _pendingEdgeOpt = @@ -80,24 +82,24 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { pos += 1 // val versionNum = Bytes.toLong(kv.value, pos, 8) // pos += 8 - val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label) pos = endAt val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) val pendingEdge = Edge(Vertex(srcVertexId, cellVersion), Vertex(tgtVertexId, cellVersion), - labelWithDir, pendingEdgeOp, + label, labelWithDir.dir, pendingEdgeOp, cellVersion, pendingEdgeProps.toMap, - statusCode = pendingEdgeStatusCode, lockTs = lockTs) + statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal)) Option(pendingEdge) } - (kvsMap, op, ts, statusCode, _pendingEdgeOpt) + (kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) } SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), - labelWithDir, op, cellVersion, props, statusCode = statusCode, - pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) + label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode, + pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal)) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala index 4f7c17b..fc84469 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -29,8 +29,8 @@ import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { import StorageSerializable._ - override val ts = snapshotEdge.version - override val table = snapshotEdge.label.hbaseTableName.getBytes() + override def ts = snapshotEdge.version + override def table = snapshotEdge.label.hbaseTableName.getBytes() def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { val byte = (((statusCode << 4) | op).toByte) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala index 8aca2cf..8d95e77 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala @@ -20,11 +20,11 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.wide import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta} +import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable} import org.apache.s2graph.core.types.TargetVertexId -import org.apache.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} +import org.apache.s2graph.core.{Edge, SnapshotEdge, Vertex} class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { @@ -34,7 +34,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { (statusCode.toByte, op.toByte) } - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { @@ -42,21 +42,23 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { assert(kvs.size == 1) val kv = kvs.head - val schemaVer = queryParam.label.schemaVersion + val label = checkLabel.get + val schemaVer = label.schemaVersion val cellVersion = kv.timestamp val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e => (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) }.getOrElse(parseRow(kv, schemaVer)) - val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = { + val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) = { val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer) var pos = 0 val (statusCode, op) = statusCodeWithOp(kv.value(pos)) pos += 1 - val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label) val kvsMap = props.toMap - val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong + val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal + val ts = tsInnerVal.toString.toLong pos = endAt val _pendingEdgeOpt = @@ -66,24 +68,24 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { pos += 1 // val versionNum = Bytes.toLong(kv.value, pos, 8) // pos += 8 - val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label) pos = endAt val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) val pendingEdge = Edge(Vertex(srcVertexId, cellVersion), Vertex(tgtVertexId, cellVersion), - labelWithDir, pendingEdgeOp, + label, labelWithDir.dir, pendingEdgeOp, cellVersion, pendingEdgeProps.toMap, - statusCode = pendingEdgeStatusCode, lockTs = lockTs) + statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal)) Option(pendingEdge) } - (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt) + (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) } SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), - labelWithDir, op, cellVersion, props, statusCode = statusCode, - pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) + label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode, + pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal)) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala index 757ef1b..4ceb4a8 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala @@ -34,8 +34,8 @@ import org.apache.s2graph.core.types.VertexId class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { import StorageSerializable._ - override val ts = snapshotEdge.version - override val table = snapshotEdge.label.hbaseTableName.getBytes() + override def ts = snapshotEdge.version + override def table = snapshotEdge.label.hbaseTableName.getBytes() def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { val byte = (((statusCode << 4) | op).toByte) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala index 737c2a8..3ec17ab 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,18 +19,19 @@ package org.apache.s2graph.core.storage.serde.vertex -import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable} import org.apache.s2graph.core.types.{InnerVal, InnerValLike, VertexId} import org.apache.s2graph.core.{QueryParam, Vertex} import scala.collection.mutable.ListBuffer -class VertexDeserializable extends Deserializable[Vertex] { - def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[Vertex]): Vertex = { +class VertexDeserializable(bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[Vertex] { + def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[Vertex]): Vertex = { val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } @@ -46,7 +47,7 @@ class VertexDeserializable extends Deserializable[Vertex] { } { val propKey = if (kv.qualifier.length == 1) kv.qualifier.head.toInt - else Bytes.toInt(kv.qualifier) + else bytesToInt(kv.qualifier, 0) val ts = kv.timestamp if (ts > maxTs) maxTs = ts @@ -63,4 +64,3 @@ class VertexDeserializable extends Deserializable[Vertex] { Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds) } } - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala index 6bb162c..77bbb87 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,11 +19,12 @@ package org.apache.s2graph.core.storage.serde.vertex -import org.apache.hadoop.hbase.util.Bytes import org.apache.s2graph.core.Vertex +import org.apache.s2graph.core.storage.StorageSerializable._ import org.apache.s2graph.core.storage.{SKeyValue, Serializable} +import org.apache.s2graph.core.utils.logger -case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] { +case class VertexSerializable(vertex: Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[Vertex] { override val table = vertex.hbaseTableName.getBytes override val ts = vertex.ts @@ -37,10 +38,10 @@ case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] { /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */ override def toKeyValues: Seq[SKeyValue] = { val row = toRowKey - val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes - val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } + val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield intToBytes(k) -> v.bytes + val belongsTo = vertex.belongLabelIds.map { labelId => intToBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } (base ++ belongsTo).map { case (qualifier, value) => SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) } toSeq } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala index c03319d..b885bc6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala @@ -20,6 +20,7 @@ package org.apache.s2graph.core.types import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.LabelMeta object HBaseType { val VERSION4 = "v4" @@ -28,7 +29,7 @@ object HBaseType { val VERSION1 = "v1" // val DEFAULT_VERSION = VERSION2 val DEFAULT_VERSION = VERSION3 - val EMPTY_SEQ_BYTE = Byte.MaxValue +// val EMPTY_SEQ_BYTE = Byte.MaxValue val DEFAULT_COL_ID = 0 val bitsForDir = 2 val maxBytes = Bytes.toBytes(Int.MaxValue) @@ -100,7 +101,7 @@ object HBaseDeserializable { val kvs = new Array[(Byte, InnerValLike)](len) var i = 0 while (i < len) { - val k = EMPTY_SEQ_BYTE + val k = LabelMeta.emptySeq val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version) pos += numOfBytesUsed kvs(i) = (k -> v)
