Repository: incubator-s2graph Updated Branches: refs/heads/master 56829adc7 -> 0d1854450
[S2GRAPH-81]: Separate Serializable's toKeyValues into 3, toRowKey, toQualifier, toValue split toKeyValues into toRowKey, toQualifier, toValue, so buildRequest only use toRowKey, toQualifier. JIRA: [S2GRAPH-81] https://issues.apache.org/jira/browse/S2GRAPH-81 Pull Request: Closes #52 Authors: DOYUNG YOON: [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/0d185445 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/0d185445 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/0d185445 Branch: refs/heads/master Commit: 0d185445056555593b524b02f689aecb9d68906a Parents: 56829ad Author: DO YUNG YOON <[email protected]> Authored: Sat Jun 11 23:58:51 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sat Jun 11 23:58:51 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../core/storage/StorageSerializable.scala | 18 +- .../core/storage/hbase/AsynchbaseStorage.scala | 19 +- .../tall/IndexEdgeDeserializable.scala | 171 ++++++++------- .../indexedge/tall/IndexEdgeSerializable.scala | 33 ++- .../wide/IndexEdgeDeserializable.scala | 214 ++++++++++--------- .../indexedge/wide/IndexEdgeSerializable.scala | 56 +++-- .../tall/SnapshotEdgeSerializable.scala | 18 +- .../wide/SnapshotEdgeSerializable.scala | 22 +- .../serde/vertex/VertexSerializable.scala | 12 +- 10 files changed, 290 insertions(+), 275 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 56f14a0..7bcc7ef 100644 --- a/CHANGES +++ b/CHANGES @@ -135,6 +135,8 @@ Release 0.12.1 - unreleased S2GRAPH-75: Use an embedded database as the default metadata storage. (Contributed by Jong Wook Kim<[email protected]>, committed by DOYUNG YOON) + S2GRAPH-81: Separate Serializable's toKeyValues into 3, toRowKey, toQualifier, toValue. (Committed by DOYUNG YOON). + TEST S2GRAPH-21: Change PostProcessBenchmarkSpec not to store and fetch test data from storage. (Committed by DOYUNG YOON). http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala index b6435e4..b7326f5 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala @@ -56,5 +56,21 @@ object StorageSerializable { } trait StorageSerializable[E] { - def toKeyValues: Seq[SKeyValue] + val cf = Serializable.edgeCf + + val table: Array[Byte] + val ts: Long + + def toRowKey: Array[Byte] + def toQualifier: Array[Byte] + def toValue: Array[Byte] + + def toKeyValues: Seq[SKeyValue] = { + val row = toRowKey + val qualifier = toQualifier + val value = toValue + val kv = SKeyValue(table, row, cf, qualifier, value, ts) + + Seq(kv) + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 66a1be4..4bd222f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -181,18 +181,17 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte val label = queryParam.label val edge = toRequestEdge(queryRequest) - val kv = if (queryParam.tgtVertexInnerIdOpt.isDefined) { + val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) { val snapshotEdge = edge.toSnapshotEdge - snapshotEdgeSerializer(snapshotEdge).toKeyValues.head - // new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) + snapshotEdgeSerializer(snapshotEdge) } else { - val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq == queryParam.labelOrderSeq) - assert(indexedEdgeOpt.isDefined) - - val indexedEdge = indexedEdgeOpt.get - indexEdgeSerializer(indexedEdge).toKeyValues.head + val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.labelWithDir, + edge.op, edge.version, queryParam.labelOrderSeq, edge.propsWithTs) + indexEdgeSerializer(indexEdge) } + val (rowKey, qualifier) = (serializer.toRowKey, serializer.toQualifier) + val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue)) label.schemaVersion match { @@ -246,8 +245,8 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte scanner case _ => val get = - if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, kv.qualifier) - else new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf) + if (queryParam.tgtVertexInnerIdOpt.isDefined) new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier) + else new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf) get.maxVersions(1) get.setFailfast(true) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index e80a805..e6265f7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -27,8 +27,6 @@ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} import scala.collection.immutable -import scala.collection.immutable - class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { import StorageDeserializable._ @@ -76,88 +74,87 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte (Array.empty[(Byte, InnerValLike)], 0) } - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - schemaVer: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val version = kv.timestamp - // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") - var pos = 0 - val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer) - pos += srcIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - pos += 1 - - val op = kv.row(pos) - pos += 1 - - if (pos == kv.row.length) { - // degree - // val degreeVal = Bytes.toLong(kv.value) - val degreeVal = bytesToLongFunc(kv.value, 0) - val ts = kv.timestamp - val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer), - LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer)) - val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) - } else { - // not degree edge - - - val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer) - pos = endAt - val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer) - } - - - val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - /** process indexProps */ - for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } { - if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) - else allProps += seq -> InnerValLikeWithTs(v, version) - } - - /** process props */ - if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) - } else { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) - props.foreach { case (k, v) => - allProps += (k -> InnerValLikeWithTs(v, version)) - } - } - val _mergedProps = allProps.result() - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - - /** process tgtVertexId */ - val tgtVertexId = - mergedProps.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) - } - - - IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) - - } - } - } + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + schemaVer: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val version = kv.timestamp + // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") + var pos = 0 + val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer) + pos += srcIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + pos += 1 + + val op = kv.row(pos) + pos += 1 + + if (pos == kv.row.length) { + // degree + // val degreeVal = Bytes.toLong(kv.value) + val degreeVal = bytesToLongFunc(kv.value, 0) + val ts = kv.timestamp + val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer), + LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer)) + val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) + IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) + } else { + // not degree edge + + + val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer) + pos = endAt + val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer) + } + + val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) + else allProps += seq -> InnerValLikeWithTs(v, version) + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) + props.foreach { case (k, v) => + allProps += (k -> InnerValLikeWithTs(v, version)) + } + } + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + + /** process tgtVertexId */ + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } + + + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala index d00877e..f17e41c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -29,14 +29,13 @@ import org.apache.s2graph.core.{GraphUtil, IndexEdge} class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { import StorageSerializable._ - val label = indexEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf + override val ts = indexEdge.version + override val table = indexEdge.label.hbaseTableName.getBytes() - val idxPropsMap = indexEdge.orders.toMap - val idxPropsBytes = propsToBytes(indexEdge.orders) + def idxPropsMap = indexEdge.orders.toMap + def idxPropsBytes = propsToBytes(indexEdge.orders) - override def toKeyValues: Seq[SKeyValue] = { + override def toRowKey: Array[Byte] = { val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes val labelWithDirBytes = indexEdge.labelWithDir.bytes val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) @@ -53,20 +52,16 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge } /** TODO search usage of op byte. if there is no, then remove opByte */ - val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier) - // val qualifierBytes = Array.fill(1)(indexEdge.op) - val qualifierBytes = Array.empty[Byte] + Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier) + } - val value = - if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) - else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) - else propsToKeyValues(indexEdge.metas.toSeq) + override def toQualifier: Array[Byte] = Array.empty[Byte] - val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version) + override def toValue: Array[Byte] = + if (indexEdge.degreeEdge) + Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) + else if (indexEdge.op == GraphUtil.operations("incrementCount")) + Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) + else propsToKeyValues(indexEdge.metas.toSeq) - // logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") - Seq(kv) - } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index b1b933f..5a8fa42 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -28,109 +28,113 @@ import scala.collection.immutable import scala.collection.immutable +import scala.collection.immutable + class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { - import StorageDeserializable._ - - type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) - type ValueRaw = (Array[(Byte, InnerValLike)], Int) - - private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { - // val degree = Bytes.toLong(kv.value) - val degree = bytesToLongFunc(kv.value, 0) - val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) - val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) - } - - private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { - var qualifierLen = 0 - var pos = 0 - val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { - val (props, endAt) = bytesToProps(kv.qualifier, pos, version) - pos = endAt - qualifierLen += endAt - val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) - } - qualifierLen += tgtVertexIdLen - (props, endAt, tgtVertexId, tgtVertexIdLen) - } - val (op, opLen) = - if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) - else (kv.qualifier(qualifierLen), 1) - - qualifierLen += opLen - - (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) - } - - private def parseValue(kv: SKeyValue, version: String): ValueRaw = { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - (props, endAt) - } - - private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { - (Array.empty[(Byte, InnerValLike)], 0) - } - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - schemaVer: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val version = kv.timestamp - - val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) - }.getOrElse(parseRow(kv, schemaVer)) - - val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = - if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer) - else parseQualifier(kv, schemaVer) - - val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - /** process indexProps */ - for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } { - if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) - else allProps += seq -> InnerValLikeWithTs(v, version) - } - - /** process props */ - if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) - } else if (kv.qualifier.isEmpty) { - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) - } else { - val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) - props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) } - } - - val _mergedProps = allProps.result() - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - - /** process tgtVertexId */ - val tgtVertexId = - mergedProps.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) - } - - IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) - - } - } + + + import StorageDeserializable._ + + type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) + type ValueRaw = (Array[(Byte, InnerValLike)], Int) + + private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { + // val degree = Bytes.toLong(kv.value) + val degree = bytesToLongFunc(kv.value, 0) + val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) + val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) + (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) + } + + private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { + var qualifierLen = 0 + var pos = 0 + val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { + val (props, endAt) = bytesToProps(kv.qualifier, pos, version) + pos = endAt + qualifierLen += endAt + val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) + } + qualifierLen += tgtVertexIdLen + (props, endAt, tgtVertexId, tgtVertexIdLen) + } + val (op, opLen) = + if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) + else (kv.qualifier(qualifierLen), 1) + + qualifierLen += opLen + + (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) + } + + private def parseValue(kv: SKeyValue, version: String): ValueRaw = { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + (props, endAt) + } + + private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { + (Array.empty[(Byte, InnerValLike)], 0) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + schemaVer: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val version = kv.timestamp + + val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) + }.getOrElse(parseRow(kv, schemaVer)) + + val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = + if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer) + else parseQualifier(kv, schemaVer) + + val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) + else allProps += seq -> InnerValLikeWithTs(v, version) + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else if (kv.qualifier.isEmpty) { + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + } else { + val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) + props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) } + } + + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + + /** process tgtVertexId */ + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } + + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) + + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala index 49e95b4..83d4338 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -28,44 +28,40 @@ import org.apache.s2graph.core.{GraphUtil, IndexEdge} class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { import StorageSerializable._ - val label = indexEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf + override val ts = indexEdge.version + override val table = indexEdge.label.hbaseTableName.getBytes() - val idxPropsMap = indexEdge.orders.toMap - val idxPropsBytes = propsToBytes(indexEdge.orders) + def idxPropsMap = indexEdge.orders.toMap + def idxPropsBytes = propsToBytes(indexEdge.orders) - override def toKeyValues: Seq[SKeyValue] = { + override def toRowKey: Array[Byte] = { val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes val labelWithDirBytes = indexEdge.labelWithDir.bytes val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") + Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + } + + override def toQualifier: Array[Byte] = { val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes - val qualifier = - if (indexEdge.degreeEdge) Array.empty[Byte] - else { - if (indexEdge.op == GraphUtil.operations("incrementCount")) { - Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) - } else { - idxPropsMap.get(LabelMeta.toSeq) match { - case None => Bytes.add(idxPropsBytes, tgtIdBytes) - case Some(vId) => idxPropsBytes - } + if (indexEdge.degreeEdge) Array.empty[Byte] + else { + if (indexEdge.op == GraphUtil.operations("incrementCount")) { + Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) + } else { + idxPropsMap.get(LabelMeta.toSeq) match { + case None => Bytes.add(idxPropsBytes, tgtIdBytes) + case Some(vId) => idxPropsBytes } } - - - val value = - if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) - else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) - else propsToKeyValues(indexEdge.metas.toSeq) - - val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) - - Seq(kv) + } } + + override def toValue: Array[Byte] = + if (indexEdge.degreeEdge) + Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) + else if (indexEdge.op == GraphUtil.operations("incrementCount")) + Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) + else propsToKeyValues(indexEdge.metas.toSeq) + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala index 716a6b9..4f7c17b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -29,9 +29,8 @@ import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { import StorageSerializable._ - val label = snapshotEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf + override val ts = snapshotEdge.version + override val table = snapshotEdge.label.hbaseTableName.getBytes() def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { val byte = (((statusCode << 4) | op).toByte) @@ -40,16 +39,18 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), propsToKeyValuesWithTs(snapshotEdge.props.toList)) - override def toKeyValues: Seq[SKeyValue] = { + override def toRowKey: Array[Byte] = { val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes val labelWithDirBytes = snapshotEdge.labelWithDir.bytes val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) - val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + } - val qualifier = Array.empty[Byte] + override def toQualifier: Array[Byte] = Array.empty[Byte] - val value = snapshotEdge.pendingEdgeOpt match { + override def toValue: Array[Byte] = + snapshotEdge.pendingEdgeOpt match { case None => valueBytes() case Some(pendingEdge) => val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) @@ -60,7 +61,4 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) } - val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) - Seq(kv) - } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala index 2eb2b1b..757ef1b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala @@ -34,9 +34,8 @@ import org.apache.s2graph.core.types.VertexId class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { import StorageSerializable._ - val label = snapshotEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf + override val ts = snapshotEdge.version + override val table = snapshotEdge.label.hbaseTableName.getBytes() def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { val byte = (((statusCode << 4) | op).toByte) @@ -45,17 +44,20 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), propsToKeyValuesWithTs(snapshotEdge.props.toList)) - override def toKeyValues: Seq[SKeyValue] = { + + override def toRowKey: Array[Byte] = { val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes val labelWithDirBytes = snapshotEdge.labelWithDir.bytes val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes + Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + } - val qualifier = tgtIdBytes + override def toQualifier: Array[Byte] = + VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes - val value = snapshotEdge.pendingEdgeOpt match { + override def toValue: Array[Byte] = + snapshotEdge.pendingEdgeOpt match { case None => valueBytes() case Some(pendingEdge) => val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) @@ -64,7 +66,5 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) } - val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) - Seq(kv) - } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/0d185445/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala index a74031a..6bb162c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala @@ -25,10 +25,18 @@ import org.apache.s2graph.core.storage.{SKeyValue, Serializable} case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] { - val cf = Serializable.vertexCf + override val table = vertex.hbaseTableName.getBytes + override val ts = vertex.ts + override val cf = Serializable.vertexCf + override def toRowKey: Array[Byte] = vertex.id.bytes + + override def toQualifier: Array[Byte] = Array.empty[Byte] + override def toValue: Array[Byte] = Array.empty[Byte] + + /** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */ override def toKeyValues: Seq[SKeyValue] = { - val row = vertex.id.bytes + val row = toRowKey val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } (base ++ belongsTo).map { case (qualifier, value) =>
