Repository: incubator-s2graph Updated Branches: refs/heads/master e207f676f -> fd8119bc9
[S2GRAPH-53]: Refactor Storage to decide which serializer/deserializer for IndexEdge/SnapshotEdge/Vertex. add serde package and change storage to contain compatability table. JIRA: [S2GRAPH-53] https://issues.apache.org/jira/browse/S2GRAPH-53 Pull Request: Closes #37 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/fd8119bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/fd8119bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/fd8119bc Branch: refs/heads/master Commit: fd8119bc9dc1cabc07bcf8b7dc49258345f45a3e Parents: e207f67 Author: DO YUNG YOON <[email protected]> Authored: Fri Mar 4 23:49:30 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Mar 4 23:49:30 2016 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../core/storage/IndexEdgeDeserializable.scala | 128 ----------------- .../core/storage/IndexEdgeSerializable.scala | 58 -------- .../storage/SnapshotEdgeDeserializable.scala | 142 ------------------- .../core/storage/SnapshotEdgeSerializable.scala | 76 ---------- .../kakao/s2graph/core/storage/Storage.scala | 59 ++++++-- .../core/storage/VertexDeserializable.scala | 46 ------ .../core/storage/VertexSerializable.scala | 18 --- .../tall/IndexEdgeDeserializable.scala | 132 +++++++++++++++++ .../indexedge/tall/IndexEdgeSerializable.scala | 53 +++++++ .../wide/IndexEdgeDeserializable.scala | 116 +++++++++++++++ .../indexedge/wide/IndexEdgeSerializable.scala | 53 +++++++ .../tall/SnapshotEdgeDeserializable.scala | 84 +++++++++++ .../tall/SnapshotEdgeSerializable.scala | 47 ++++++ .../wide/SnapshotEdgeDeserializable.scala | 70 +++++++++ .../wide/SnapshotEdgeSerializable.scala | 50 +++++++ .../serde/vertex/VertexDeserializable.scala | 46 ++++++ .../serde/vertex/VertexSerializable.scala | 20 +++ 18 files changed, 725 insertions(+), 476 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 97434d6..a91718f 100644 --- a/CHANGES +++ b/CHANGES @@ -37,6 +37,9 @@ Release 0.12.1 - unreleased S2GRAPH-44: Provide cache for WhereParser on query (Committed by DOYUNG YOON). + S2GRAPH-53: Refactor Storage to decide which serializer/deserializer for IndexEdge/SnapshotEdge/Vertex + (Committed by DOYUNG YOON). + BUG FIXES S2GRAPH-18: Query Option "interval" is Broken. http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala deleted file mode 100644 index 2190222..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeDeserializable.scala +++ /dev/null @@ -1,128 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.storage.{CanSKeyValue, StorageDeserializable, SKeyValue} -import com.kakao.s2graph.core.types._ -import org.apache.hadoop.hbase.util.Bytes -import StorageDeserializable._ - -class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { - - import StorageDeserializable._ - - type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) - type ValueRaw = (Array[(Byte, InnerValLike)], Int) - - private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { -// val degree = Bytes.toLong(kv.value) - val degree = bytesToLongFunc(kv.value, 0) - val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) - val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) - } - - private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { - var qualifierLen = 0 - var pos = 0 - val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { - val (props, endAt) = bytesToProps(kv.qualifier, pos, version) - pos = endAt - qualifierLen += endAt - val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) - } - qualifierLen += tgtVertexIdLen - (props, endAt, tgtVertexId, tgtVertexIdLen) - } - val (op, opLen) = - if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) - else (kv.qualifier(qualifierLen), 1) - - qualifierLen += opLen - - (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) - } - - private def parseValue(kv: SKeyValue, version: String): ValueRaw = { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - (props, endAt) - } - - private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { - (Array.empty[(Byte, InnerValLike)], 0) - } - - - - /** version 1 and version 2 is same logic */ - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[IndexEdge] = None): IndexEdge = { - fromKeyValuesInnerOld(queryParam, _kvs, version, cacheElementOpt) - } - - def fromKeyValuesInnerOld[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[IndexEdge] = None): IndexEdge = { - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) - }.getOrElse(parseRow(kv, version)) - - val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = - if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) - else parseQualifier(kv, version) - - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { -// val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) - } else if (kv.qualifier.isEmpty) { - parseDegreeValue(kv, version) - } else { - parseValue(kv, version) - } - - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - - // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size) - - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield { - if (k == LabelMeta.degreeSeq) k -> v - else seq -> v - } - - val idxPropsMap = idxProps.toMap - val tgtVertexId = if (tgtVertexIdInQualifier) { - idxPropsMap.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } - } else tgtVertexIdRaw - - val _mergedProps = (idxProps ++ props).toMap - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - - // logger.error(s"$mergedProps") - // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong - - val ts = kv.timestamp - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) - - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala deleted file mode 100644 index 56f70b9..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/IndexEdgeSerializable.scala +++ /dev/null @@ -1,58 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.storage.{StorageSerializable, SKeyValue} -import com.kakao.s2graph.core.types.{HBaseType, VertexId} -import com.kakao.s2graph.core.utils.logger -import com.kakao.s2graph.core.{GraphUtil, IndexEdge} -import org.apache.hadoop.hbase.util.Bytes - -case class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { - - import StorageSerializable._ - - val label = indexEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf - - val idxPropsMap = indexEdge.orders.toMap - val idxPropsBytes = propsToBytes(indexEdge.orders) - - /** version 1 and version 2 share same code for serialize row key part */ - override def toKeyValues: Seq[SKeyValue] = { - toKeyValuesInner - } - def toKeyValuesInner: Seq[SKeyValue] = { - val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes - val labelWithDirBytes = indexEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") - val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes - val qualifier = - if (indexEdge.degreeEdge) Array.empty[Byte] - else { - if (indexEdge.op == GraphUtil.operations("incrementCount")) { - Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) - } else { - idxPropsMap.get(LabelMeta.toSeq) match { - case None => Bytes.add(idxPropsBytes, tgtIdBytes) - case Some(vId) => idxPropsBytes - } - } - } - - - val value = - if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) - else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) - else propsToKeyValues(indexEdge.metas.toSeq) - - val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) - - Seq(kv) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala deleted file mode 100644 index d4f55f0..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeDeserializable.scala +++ /dev/null @@ -1,142 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta} -import com.kakao.s2graph.core.storage.{CanSKeyValue, SKeyValue, StorageDeserializable} -import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} -import org.apache.hadoop.hbase.util.Bytes - -class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { - - import StorageDeserializable._ - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { - queryParam.label.schemaVersion match { - case HBaseType.VERSION2 | HBaseType.VERSION1 => fromKeyValuesInnerOld(queryParam, _kvs, version, cacheElementOpt) - case _ => fromKeyValuesInnerV3(queryParam, _kvs, version, cacheElementOpt) - } - } - - def statusCodeWithOp(byte: Byte): (Byte, Byte) = { - val statusCode = byte >> 4 - val op = byte & ((1 << 4) - 1) - (statusCode.toByte, op.toByte) - } - - def fromKeyValuesInnerOld[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - assert(kvs.size == 1) - - val kv = kvs.head - val schemaVer = queryParam.label.schemaVersion - val cellVersion = kv.timestamp - - val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) - }.getOrElse(parseRow(kv, schemaVer)) - - val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = { - val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer) - var pos = 0 - val (statusCode, op) = statusCodeWithOp(kv.value(pos)) - pos += 1 - val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - val kvsMap = props.toMap - val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong - - pos = endAt - val _pendingEdgeOpt = - if (pos == kv.value.length) None - else { - val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) - pos += 1 - // val versionNum = Bytes.toLong(kv.value, pos, 8) - // pos += 8 - val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - pos = endAt - val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) - - val pendingEdge = - Edge(Vertex(srcVertexId, cellVersion), - Vertex(tgtVertexId, cellVersion), - labelWithDir, pendingEdgeOp, - cellVersion, pendingEdgeProps.toMap, - statusCode = pendingEdgeStatusCode, lockTs = lockTs) - Option(pendingEdge) - } - - (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt) - } - - SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), - labelWithDir, op, cellVersion, props, statusCode = statusCode, - pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) - } - - private def fromKeyValuesInnerV3[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], version: String, cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - assert(kvs.size == 1) - - val kv = kvs.head - val schemaVer = queryParam.label.schemaVersion - val cellVersion = kv.timestamp - /** rowKey */ - def parseRowV3(kv: SKeyValue, version: String) = { - var pos = 0 - val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version) - pos += srcIdAndTgtIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - - val rowLen = srcIdAndTgtIdLen + 4 + 1 - (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, labelIdxSeq, isInverted, rowLen) - - } - val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map { e => - (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) - }.getOrElse(parseRowV3(kv, schemaVer)) - - val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId) - val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId) - - val (props, op, ts, statusCode, _pendingEdgeOpt) = { - var pos = 0 - val (statusCode, op) = statusCodeWithOp(kv.value(pos)) - pos += 1 - val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - val kvsMap = props.toMap - val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong - - pos = endAt - val _pendingEdgeOpt = - if (pos == kv.value.length) None - else { - val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) - pos += 1 - // val versionNum = Bytes.toLong(kv.value, pos, 8) - // pos += 8 - val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - pos = endAt - val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) - - val pendingEdge = - Edge(Vertex(srcVertexId, cellVersion), - Vertex(tgtVertexId, cellVersion), - labelWithDir, pendingEdgeOp, - cellVersion, pendingEdgeProps.toMap, - statusCode = pendingEdgeStatusCode, lockTs = lockTs) - Option(pendingEdge) - } - - (kvsMap, op, ts, statusCode, _pendingEdgeOpt) - } - - SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), - labelWithDir, op, cellVersion, props, statusCode = statusCode, - pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala deleted file mode 100644 index 9e6e1b7..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SnapshotEdgeSerializable.scala +++ /dev/null @@ -1,76 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core.SnapshotEdge -import com.kakao.s2graph.core.mysqls.LabelIndex -import com.kakao.s2graph.core.types.{HBaseType, SourceAndTargetVertexIdPair, VertexId} -import org.apache.hadoop.hbase.util.Bytes - - -class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { - import StorageSerializable._ - - val label = snapshotEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf - - def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { - val byte = (((statusCode << 4) | op).toByte) - Array.fill(1)(byte.toByte) - } - def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), - propsToKeyValuesWithTs(snapshotEdge.props.toList)) - - override def toKeyValues: Seq[SKeyValue] = { - label.schemaVersion match { - case HBaseType.VERSION1 | HBaseType.VERSION2 => toKeyValuesInner - case _ => toKeyValuesInnerV3 - } - } - - private def toKeyValuesInner: Seq[SKeyValue] = { - val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes - val labelWithDirBytes = snapshotEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) - - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes - - val qualifier = tgtIdBytes - - val value = snapshotEdge.pendingEdgeOpt match { - case None => valueBytes() - case Some(pendingEdge) => - val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) - val versionBytes = Array.empty[Byte] - val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) - val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) - Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) - } - val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) - Seq(kv) - } - - private def toKeyValuesInnerV3: Seq[SKeyValue] = { - val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes - val labelWithDirBytes = snapshotEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) - - val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - - val qualifier = Array.empty[Byte] - - val value = snapshotEdge.pendingEdgeOpt match { - case None => valueBytes() - case Some(pendingEdge) => - val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) - val versionBytes = Array.empty[Byte] - val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) - val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) - - Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) - } - - val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) - Seq(kv) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala index bca8df3..cc8f13c 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala @@ -4,6 +4,10 @@ import com.kakao.s2graph.core.ExceptionHandler.{Key, Val, KafkaMessage} import com.kakao.s2graph.core.GraphExceptions.FetchTimeoutException import com.kakao.s2graph.core._ import com.kakao.s2graph.core.mysqls._ +import com.kakao.s2graph.core.storage.serde._ +import com.kakao.s2graph.core.storage.serde.snapshotedge.tall +import com.kakao.s2graph.core.storage.serde.snapshotedge.wide.SnapshotEdgeDeserializable +import com.kakao.s2graph.core.storage.serde.vertex._ import com.kakao.s2graph.core.types._ import com.kakao.s2graph.core.utils.{Extensions, logger} import com.typesafe.config.Config @@ -16,6 +20,8 @@ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Random, Try} abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { + import HBaseType._ + /** storage dependent configurations */ val MaxRetryNum = config.getInt("max.retry.number") val MaxBackOff = config.getInt("max.back.off") @@ -26,6 +32,14 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { val expireAfterWrite = config.getInt("future.cache.expire.after.write") val expireAfterAccess = config.getInt("future.cache.expire.after.access") + /** + * Compatibility table + * | label schema version | snapshot edge | index edge | vertex | note | + * | v1 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue | + * | v2 | serde.snapshotedge.wide | serde.indexedge.wide | serde.vertex | do not use this. this exist only for backward compatibility issue | + * | v3 | serde.snapshotedge.tall | serde.indexedge.wide | serde.vertex | recommended with HBase. current stable schema | + * + */ /** * create serializer that knows how to convert given snapshotEdge into kvs: Seq[SKeyValue] @@ -33,14 +47,26 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { * @param snapshotEdge: snapshotEdge to serialize * @return serializer implementation for StorageSerializable which has toKeyValues return Seq[SKeyValue] */ - def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = new SnapshotEdgeSerializable(snapshotEdge) + def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge) = { + snapshotEdge.schemaVer match { + case VERSION1 | VERSION2 => new serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge) + case VERSION3 => new serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge) + case _ => throw new RuntimeException(s"not supported version: ${snapshotEdge.schemaVer}") + } + } /** * create serializer that knows how to convert given indexEdge into kvs: Seq[SKeyValue] - * @param indexedEdge: indexEdge to serialize + * @param indexEdge: indexEdge to serialize * @return serializer implementation */ - def indexEdgeSerializer(indexedEdge: IndexEdge) = new IndexEdgeSerializable(indexedEdge) + def indexEdgeSerializer(indexEdge: IndexEdge) = { + indexEdge.schemaVer match { + case VERSION1 | VERSION2 | VERSION3 => new indexedge.wide.IndexEdgeSerializable(indexEdge) + case _ => throw new RuntimeException(s"not supported version: ${indexEdge.schemaVer}") + + } + } /** * create serializer that knows how to convert given vertex into kvs: Seq[SKeyValue] @@ -58,10 +84,24 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { * if any storaage use different class to represent stored byte array, * then that storage implementation is responsible to provide implicit type conversion method on CanSKeyValue. * */ - val snapshotEdgeDeserializer = new SnapshotEdgeDeserializable + + val snapshotEdgeDeserializers = Map( + VERSION1 -> new snapshotedge.wide.SnapshotEdgeDeserializable, + VERSION2 -> new snapshotedge.wide.SnapshotEdgeDeserializable, + VERSION3 -> new tall.SnapshotEdgeDeserializable + ) + def snapshotEdgeDeserializer(schemaVer: String) = + snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}")) /** create deserializer that can parse stored CanSKeyValue into indexEdge. */ - val indexEdgeDeserializer = new IndexEdgeDeserializable + val indexEdgeDeserializers = Map( + VERSION1 -> new indexedge.wide.IndexEdgeDeserializable, + VERSION2 -> new indexedge.wide.IndexEdgeDeserializable, + VERSION3 -> new indexedge.wide.IndexEdgeDeserializable + ) + + def indexEdgeDeserializer(schemaVer: String) = + indexEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}")) /** create deserializer that can parser stored CanSKeyValue into vertex. */ val vertexDeserializer = new VertexDeserializable @@ -587,7 +627,8 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { parentEdges: Seq[EdgeWithScore]): Option[Edge] = { // logger.debug(s"toEdge: $kv") try { - val indexEdgeOpt = indexEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) + val schemaVer = queryParam.label.schemaVersion + val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = parentEdges)) } catch { case ex: Exception => @@ -602,7 +643,8 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { isInnerCall: Boolean, parentEdges: Seq[EdgeWithScore]): Option[Edge] = { // logger.debug(s"SnapshottoEdge: $kv") - val snapshotEdgeOpt = snapshotEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) + val schemaVer = queryParam.label.schemaVersion + val snapshotEdgeOpt = snapshotEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) if (isInnerCall) { snapshotEdgeOpt.flatMap { snapshotEdge => @@ -631,9 +673,10 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { else { val first = kvs.head val kv = first + val schemaVer = queryParam.label.schemaVersion val cacheElementOpt = if (queryParam.isSnapshotEdge) None - else indexEdgeDeserializer.fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None) + else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), queryParam.label.schemaVersion, None) for { kv <- kvs http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala deleted file mode 100644 index 699981d..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexDeserializable.scala +++ /dev/null @@ -1,46 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId} -import com.kakao.s2graph.core.{QueryParam, Vertex} -import org.apache.hadoop.hbase.util.Bytes - -import scala.collection.mutable.ListBuffer - -class VertexDeserializable extends Deserializable[Vertex] { - def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[Vertex]): Vertex = { - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) - - var maxTs = Long.MinValue - val propsMap = new collection.mutable.HashMap[Int, InnerValLike] - val belongLabelIds = new ListBuffer[Int] - - for { - kv <- kvs - } { - val propKey = - if (kv.qualifier.length == 1) kv.qualifier.head.toInt - else Bytes.toInt(kv.qualifier) - - val ts = kv.timestamp - if (ts > maxTs) maxTs = ts - - if (Vertex.isLabelId(propKey)) { - belongLabelIds += Vertex.toLabelId(propKey) - } else { - val v = kv.value - val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) - propsMap += (propKey -> value) - } - } - assert(maxTs != Long.MinValue) - Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala deleted file mode 100644 index bda909d..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/VertexSerializable.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core.Vertex -import org.apache.hadoop.hbase.util.Bytes - -case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] { - - val cf = Serializable.vertexCf - - override def toKeyValues: Seq[SKeyValue] = { - val row = vertex.id.bytes - val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes - val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } - (base ++ belongsTo).map { case (qualifier, value) => - SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) - } toSeq - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala new file mode 100644 index 0000000..014a5c9 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -0,0 +1,132 @@ +package com.kakao.s2graph.core.storage.serde.indexedge.tall + +import com.kakao.s2graph.core.mysqls.LabelMeta +import com.kakao.s2graph.core.storage.StorageDeserializable._ +import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} +import com.kakao.s2graph.core.types._ +import com.kakao.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import org.apache.hadoop.hbase.util.Bytes + +class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { + import StorageDeserializable._ + + type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) + type ValueRaw = (Array[(Byte, InnerValLike)], Int) + + private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { + // val degree = Bytes.toLong(kv.value) + val degree = bytesToLongFunc(kv.value, 0) + val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) + val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) + (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) + } + + private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { + var qualifierLen = 0 + var pos = 0 + val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { + val (props, endAt) = bytesToProps(kv.qualifier, pos, version) + pos = endAt + qualifierLen += endAt + val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) + } + qualifierLen += tgtVertexIdLen + (props, endAt, tgtVertexId, tgtVertexIdLen) + } + val (op, opLen) = + if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) + else (kv.qualifier(qualifierLen), 1) + + qualifierLen += opLen + + (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) + } + + private def parseValue(kv: SKeyValue, version: String): ValueRaw = { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + (props, endAt) + } + + private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { + (Array.empty[(Byte, InnerValLike)], 0) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + + // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") + var pos = 0 + val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) + pos += srcIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + pos += 1 + + val op = kv.row(pos) + pos += 1 + + if (pos == kv.row.length) { + // degree + // val degreeVal = Bytes.toLong(kv.value) + val degreeVal = bytesToLongFunc(kv.value, 0) + val ts = kv.timestamp + val props = Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, version), + LabelMeta.degreeSeq -> InnerVal.withLong(degreeVal, version)) + val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) + IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) + } else { + // not degree edge + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version) + pos = endAt + val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version) + } + + val idxProps = for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v + + val idxPropsMap = idxProps.toMap + + val tgtVertexId = + idxPropsMap.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) + } + + val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) + (dummyProps, 8) + } else { + bytesToKeyValues(kv.value, 0, kv.value.length, version) + } + + val _mergedProps = (idxProps ++ props).toMap + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) + + val ts = kv.timestamp + IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) + + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala new file mode 100644 index 0000000..46ad15f --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -0,0 +1,53 @@ +package com.kakao.s2graph.core.storage.serde.indexedge.tall + +import com.kakao.s2graph.core.mysqls.LabelMeta +import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import com.kakao.s2graph.core.types.VertexId +import com.kakao.s2graph.core.{GraphUtil, IndexEdge} +import org.apache.hadoop.hbase.util.Bytes + + +class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { + import StorageSerializable._ + + val label = indexEdge.label + val table = label.hbaseTableName.getBytes() + val cf = Serializable.edgeCf + + val idxPropsMap = indexEdge.orders.toMap + val idxPropsBytes = propsToBytes(indexEdge.orders) + + override def toKeyValues: Seq[SKeyValue] = { + val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes + val labelWithDirBytes = indexEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) + + val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") + + val qualifier = + if (indexEdge.degreeEdge) Array.empty[Byte] + else + idxPropsMap.get(LabelMeta.toSeq) match { + case None => Bytes.add(idxPropsBytes, VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes) + case Some(vId) => idxPropsBytes + } + + /** TODO search usage of op byte. if there is no, then remove opByte */ + val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier) + // val qualifierBytes = Array.fill(1)(indexEdge.op) + val qualifierBytes = Array.empty[Byte] + + val value = + if (indexEdge.degreeEdge) + Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + else if (indexEdge.op == GraphUtil.operations("incrementCount")) + Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + else propsToKeyValues(indexEdge.metas.toSeq) + + val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version) + + // logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") + Seq(kv) + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala new file mode 100644 index 0000000..f83dd1f --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -0,0 +1,116 @@ +package com.kakao.s2graph.core.storage.serde.indexedge.wide + +import com.kakao.s2graph.core.mysqls.LabelMeta +import com.kakao.s2graph.core.storage.StorageDeserializable._ +import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} +import com.kakao.s2graph.core.types._ +import com.kakao.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} + +class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { + import StorageDeserializable._ + + type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) + type ValueRaw = (Array[(Byte, InnerValLike)], Int) + + private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { + // val degree = Bytes.toLong(kv.value) + val degree = bytesToLongFunc(kv.value, 0) + val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) + val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) + (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) + } + + private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { + var qualifierLen = 0 + var pos = 0 + val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { + val (props, endAt) = bytesToProps(kv.qualifier, pos, version) + pos = endAt + qualifierLen += endAt + val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) + } + qualifierLen += tgtVertexIdLen + (props, endAt, tgtVertexId, tgtVertexIdLen) + } + val (op, opLen) = + if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) + else (kv.qualifier(qualifierLen), 1) + + qualifierLen += opLen + + (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) + } + + private def parseValue(kv: SKeyValue, version: String): ValueRaw = { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + (props, endAt) + } + + private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { + (Array.empty[(Byte, InnerValLike)], 0) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) + }.getOrElse(parseRow(kv, version)) + + val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = + if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) + else parseQualifier(kv, version) + + val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) + (dummyProps, 8) + } else if (kv.qualifier.isEmpty) { + parseDegreeValue(kv, version) + } else { + parseValue(kv, version) + } + + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + + // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size) + + val idxProps = for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } yield { + if (k == LabelMeta.degreeSeq) k -> v + else seq -> v + } + + val idxPropsMap = idxProps.toMap + val tgtVertexId = if (tgtVertexIdInQualifier) { + idxPropsMap.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) + } + } else tgtVertexIdRaw + + val _mergedProps = (idxProps ++ props).toMap + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) + + // logger.error(s"$mergedProps") + // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong + + val ts = kv.timestamp + IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) + + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala new file mode 100644 index 0000000..716b6fb --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -0,0 +1,53 @@ +package com.kakao.s2graph.core.storage.serde.indexedge.wide + +import com.kakao.s2graph.core.mysqls.LabelMeta +import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import com.kakao.s2graph.core.types.VertexId +import com.kakao.s2graph.core.{GraphUtil, IndexEdge} +import org.apache.hadoop.hbase.util.Bytes + + +class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { + import StorageSerializable._ + + val label = indexEdge.label + val table = label.hbaseTableName.getBytes() + val cf = Serializable.edgeCf + + val idxPropsMap = indexEdge.orders.toMap + val idxPropsBytes = propsToBytes(indexEdge.orders) + + override def toKeyValues: Seq[SKeyValue] = { + val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes + val labelWithDirBytes = indexEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) + + val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") + val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes + val qualifier = + if (indexEdge.degreeEdge) Array.empty[Byte] + else { + if (indexEdge.op == GraphUtil.operations("incrementCount")) { + Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) + } else { + idxPropsMap.get(LabelMeta.toSeq) match { + case None => Bytes.add(idxPropsBytes, tgtIdBytes) + case Some(vId) => idxPropsBytes + } + } + } + + + val value = + if (indexEdge.degreeEdge) + Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + else if (indexEdge.op == GraphUtil.operations("incrementCount")) + Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + else propsToKeyValues(indexEdge.metas.toSeq) + + val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) + + Seq(kv) + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala new file mode 100644 index 0000000..c97bed6 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala @@ -0,0 +1,84 @@ +package com.kakao.s2graph.core.storage.serde.snapshotedge.tall + +import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta} +import com.kakao.s2graph.core.storage.StorageDeserializable._ +import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue} +import com.kakao.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId} +import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} +import org.apache.hadoop.hbase.util.Bytes + +class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { + + def statusCodeWithOp(byte: Byte): (Byte, Byte) = { + val statusCode = byte >> 4 + val op = byte & ((1 << 4) - 1) + (statusCode.toByte, op.toByte) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + assert(kvs.size == 1) + + val kv = kvs.head + val schemaVer = queryParam.label.schemaVersion + val cellVersion = kv.timestamp + /** rowKey */ + def parseRowV3(kv: SKeyValue, version: String) = { + var pos = 0 + val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version) + pos += srcIdAndTgtIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + + val rowLen = srcIdAndTgtIdLen + 4 + 1 + (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, labelIdxSeq, isInverted, rowLen) + + } + val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map { e => + (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) + }.getOrElse(parseRowV3(kv, schemaVer)) + + val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId) + val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId) + + val (props, op, ts, statusCode, _pendingEdgeOpt) = { + var pos = 0 + val (statusCode, op) = statusCodeWithOp(kv.value(pos)) + pos += 1 + val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + val kvsMap = props.toMap + val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong + + pos = endAt + val _pendingEdgeOpt = + if (pos == kv.value.length) None + else { + val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) + pos += 1 + // val versionNum = Bytes.toLong(kv.value, pos, 8) + // pos += 8 + val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + pos = endAt + val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) + + val pendingEdge = + Edge(Vertex(srcVertexId, cellVersion), + Vertex(tgtVertexId, cellVersion), + labelWithDir, pendingEdgeOp, + cellVersion, pendingEdgeProps.toMap, + statusCode = pendingEdgeStatusCode, lockTs = lockTs) + Option(pendingEdge) + } + + (kvsMap, op, ts, statusCode, _pendingEdgeOpt) + } + + SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), + labelWithDir, op, cellVersion, props, statusCode = statusCode, + pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala new file mode 100644 index 0000000..a507b90 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -0,0 +1,47 @@ +package com.kakao.s2graph.core.storage.serde.snapshotedge.tall + +import com.kakao.s2graph.core.SnapshotEdge +import com.kakao.s2graph.core.mysqls.LabelIndex +import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import com.kakao.s2graph.core.types.SourceAndTargetVertexIdPair +import org.apache.hadoop.hbase.util.Bytes + + +class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { + import StorageSerializable._ + + val label = snapshotEdge.label + val table = label.hbaseTableName.getBytes() + val cf = Serializable.edgeCf + + def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { + val byte = (((statusCode << 4) | op).toByte) + Array.fill(1)(byte.toByte) + } + def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), + propsToKeyValuesWithTs(snapshotEdge.props.toList)) + + override def toKeyValues: Seq[SKeyValue] = { + val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes + val labelWithDirBytes = snapshotEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) + + val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + + val qualifier = Array.empty[Byte] + + val value = snapshotEdge.pendingEdgeOpt match { + case None => valueBytes() + case Some(pendingEdge) => + val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) + val versionBytes = Array.empty[Byte] + val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) + val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) + + Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) + } + + val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) + Seq(kv) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala new file mode 100644 index 0000000..1174f50 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala @@ -0,0 +1,70 @@ +package com.kakao.s2graph.core.storage.serde.snapshotedge.wide + +import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta} +import com.kakao.s2graph.core.storage.StorageDeserializable._ +import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable} +import com.kakao.s2graph.core.types.TargetVertexId +import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} +import org.apache.hadoop.hbase.util.Bytes + +class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { + + def statusCodeWithOp(byte: Byte): (Byte, Byte) = { + val statusCode = byte >> 4 + val op = byte & ((1 << 4) - 1) + (statusCode.toByte, op.toByte) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + assert(kvs.size == 1) + + val kv = kvs.head + val schemaVer = queryParam.label.schemaVersion + val cellVersion = kv.timestamp + + val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) + }.getOrElse(parseRow(kv, schemaVer)) + + val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = { + val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer) + var pos = 0 + val (statusCode, op) = statusCodeWithOp(kv.value(pos)) + pos += 1 + val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + val kvsMap = props.toMap + val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong + + pos = endAt + val _pendingEdgeOpt = + if (pos == kv.value.length) None + else { + val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) + pos += 1 + // val versionNum = Bytes.toLong(kv.value, pos, 8) + // pos += 8 + val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + pos = endAt + val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) + + val pendingEdge = + Edge(Vertex(srcVertexId, cellVersion), + Vertex(tgtVertexId, cellVersion), + labelWithDir, pendingEdgeOp, + cellVersion, pendingEdgeProps.toMap, + statusCode = pendingEdgeStatusCode, lockTs = lockTs) + Option(pendingEdge) + } + + (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt) + } + + SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), + labelWithDir, op, cellVersion, props, statusCode = statusCode, + pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala new file mode 100644 index 0000000..e6074d9 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala @@ -0,0 +1,50 @@ +package com.kakao.s2graph.core.storage.serde.snapshotedge.wide + +import com.kakao.s2graph.core.SnapshotEdge +import com.kakao.s2graph.core.mysqls.LabelIndex +import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import com.kakao.s2graph.core.types.VertexId +import org.apache.hadoop.hbase.util.Bytes + + +/** + * this class serialize + * @param snapshotEdge + */ +class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { + import StorageSerializable._ + + val label = snapshotEdge.label + val table = label.hbaseTableName.getBytes() + val cf = Serializable.edgeCf + + def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { + val byte = (((statusCode << 4) | op).toByte) + Array.fill(1)(byte.toByte) + } + def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), + propsToKeyValuesWithTs(snapshotEdge.props.toList)) + + override def toKeyValues: Seq[SKeyValue] = { + val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes + val labelWithDirBytes = snapshotEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) + + val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes + + val qualifier = tgtIdBytes + + val value = snapshotEdge.pendingEdgeOpt match { + case None => valueBytes() + case Some(pendingEdge) => + val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) + val versionBytes = Array.empty[Byte] + val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) + val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) + Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) + } + val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) + Seq(kv) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala new file mode 100644 index 0000000..e355401 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala @@ -0,0 +1,46 @@ +package com.kakao.s2graph.core.storage.serde.vertex + +import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable} +import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId} +import com.kakao.s2graph.core.{QueryParam, Vertex} +import org.apache.hadoop.hbase.util.Bytes +import scala.collection.mutable.ListBuffer + +class VertexDeserializable extends Deserializable[Vertex] { + def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[Vertex]): Vertex = { + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) + + var maxTs = Long.MinValue + val propsMap = new collection.mutable.HashMap[Int, InnerValLike] + val belongLabelIds = new ListBuffer[Int] + + for { + kv <- kvs + } { + val propKey = + if (kv.qualifier.length == 1) kv.qualifier.head.toInt + else Bytes.toInt(kv.qualifier) + + val ts = kv.timestamp + if (ts > maxTs) maxTs = ts + + if (Vertex.isLabelId(propKey)) { + belongLabelIds += Vertex.toLabelId(propKey) + } else { + val v = kv.value + val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) + propsMap += (propKey -> value) + } + } + assert(maxTs != Long.MinValue) + Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/fd8119bc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala new file mode 100644 index 0000000..0c17592 --- /dev/null +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala @@ -0,0 +1,20 @@ +package com.kakao.s2graph.core.storage.serde.vertex + +import com.kakao.s2graph.core.Vertex +import com.kakao.s2graph.core.storage.{SKeyValue, Serializable} +import org.apache.hadoop.hbase.util.Bytes + + +case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] { + + val cf = Serializable.vertexCf + + override def toKeyValues: Seq[SKeyValue] = { + val row = vertex.id.bytes + val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes + val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } + (base ++ belongsTo).map { case (qualifier, value) => + SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) + } toSeq + } +}
