Repository: incubator-s2graph Updated Branches: refs/heads/master 4e40ec7e6 -> 56829adc7
[S2GRAPH-69]: Change IndexEdge's props data type. change props type of IndexEdge from Map[Byte, InnerValLike] to Map[Byte, InnerValLikeWithTs] change caller of IndexEdge's props data according to above type change. JIRA: [S2GRAPH-69] https://issues.apache.org/jira/browse/S2GRAPH-69 Pull Request: Closes #51 Authors: DOYUNG YOON: [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/56829adc Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/56829adc Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/56829adc Branch: refs/heads/master Commit: 56829adc7917cbc6afd8f96db036efa237265827 Parents: 4e40ec7 Author: DO YUNG YOON <[email protected]> Authored: Sat Jun 11 23:39:06 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sat Jun 11 23:39:06 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../scala/org/apache/s2graph/core/Edge.scala | 43 +++++++++---------- .../apache/s2graph/core/storage/Storage.scala | 4 +- .../tall/IndexEdgeDeserializable.scala | 43 ++++++++++--------- .../indexedge/tall/IndexEdgeSerializable.scala | 4 +- .../wide/IndexEdgeDeserializable.scala | 44 ++++++++++---------- .../indexedge/wide/IndexEdgeSerializable.scala | 4 +- .../s2graph/core/types/InnerValLike.scala | 4 ++ .../core/storage/hbase/IndexEdgeTest.scala | 24 +++++------ 9 files changed, 91 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/56829adc/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 8ee4761..56f14a0 100644 --- a/CHANGES +++ b/CHANGES @@ -121,6 +121,8 @@ Release 0.12.1 - unreleased S2GRAPH-7: Abstract common codes for rest project into s2core. (Committed by daewon). S2GRAPH-31: Remove playframework dependencies on s2core/build.sbt. (Committed by DOYUNG YOON). + + S2GRAPH-69: Change IndexEdge's props data type. (Committed by DOYUNG YOON). S2GRAPH-87: LICENSE file should be changed. (Committed by Sergio Fernández). http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/56829adc/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala index 1169ba9..979268b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala @@ -70,14 +70,14 @@ case class IndexEdge(srcVertex: Vertex, op: Byte, version: Long, labelIndexSeq: Byte, - props: Map[Byte, InnerValLike]) extends JSONParser { - if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") + props: Map[Byte, InnerValLikeWithTs]) extends JSONParser { + // if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") // assert(props.containsKey(LabelMeta.timeStampSeq)) - val ts = props(LabelMeta.timeStampSeq).toString.toLong - val degreeEdge = props.contains(LabelMeta.degreeSeq) + lazy val ts = props(LabelMeta.timeStampSeq).innerVal.toString.toLong + lazy val degreeEdge = props.contains(LabelMeta.degreeSeq) lazy val label = Label.findById(labelWithDir.labelId) - val schemaVer = label.schemaVersion + lazy val schemaVer = label.schemaVersion lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta => val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer) @@ -92,9 +92,9 @@ case class IndexEdge(srcVertex: Vertex, case None => /** - * TODO: agly hack - * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once - */ + * TODO: agly hack + * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once + */ val v = k match { case LabelMeta.timeStampSeq => InnerVal.withLong(version, schemaVer) case LabelMeta.toSeq => tgtVertex.innerId @@ -105,14 +105,14 @@ case class IndexEdge(srcVertex: Vertex, } k -> v - case Some(v) => k -> v + case Some(v) => k -> v.innerVal } } lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet - lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v + lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v.innerVal - lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } + // lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } //TODO: // lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList @@ -122,11 +122,11 @@ case class IndexEdge(srcVertex: Vertex, def propsWithName = for { (seq, v) <- props meta <- label.metaPropsMap.get(seq) if seq >= 0 - jsValue <- innerValToJsValue(v, meta.dataType) + jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue - def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, propsWithTs) + def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, props) // only for debug def toLogString() = { @@ -154,8 +154,9 @@ case class Edge(srcVertex: Vertex, def props = propsWithTs.mapValues(_.innerVal) def relatedEdges = { - if (labelWithDir.isDirected) List(this, duplicateEdge) - else { + if (labelWithDir.isDirected) { + List(this, duplicateEdge) + } else { val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) val base = copy(labelWithDir = outDir) List(base, base.reverseSrcTgtEdge) @@ -202,15 +203,15 @@ case class Edge(srcVertex: Vertex, def isDegree = propsWithTs.contains(LabelMeta.degreeSeq) - def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { - case Some(_) => props - case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) - } + // def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { + // case Some(_) => props + // case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) + // } - def propsPlusTsValid = propsPlusTs.filter(kv => kv._1 >= 0) + def propsPlusTsValid = propsWithTs.filter(kv => kv._1 >= 0) def edgesWithIndex = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTs) + IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsWithTs) } def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/56829adc/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index f5cd9b2..92a93bd 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -1213,13 +1213,13 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { /** IndexEdge */ def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> InnerVal.withLong(amount, indexedEdge.schemaVer)) + val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) val _indexedEdge = indexedEdge.copy(props = newProps) indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) } def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> InnerVal.withLong(amount, indexedEdge.schemaVer)) + val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)) val _indexedEdge = indexedEdge.copy(props = newProps) indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/56829adc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index da029ef..e80a805 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -25,6 +25,7 @@ import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import scala.collection.immutable import scala.collection.immutable @@ -77,7 +78,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], - version: String, + schemaVer: String, cacheElementOpt: Option[IndexEdge]): IndexEdge = { assert(_kvs.size == 1) @@ -85,10 +86,10 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } val kv = kvs.head - + val version = kv.timestamp // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") var pos = 0 - val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) + val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer) pos += srcIdLen val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) pos += 4 @@ -103,55 +104,59 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte // val degreeVal = Bytes.toLong(kv.value) val degreeVal = bytesToLongFunc(kv.value, 0) val ts = kv.timestamp - val props = Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, version), - LabelMeta.degreeSeq -> InnerVal.withLong(degreeVal, version)) - val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) + val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer), + LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer)) + val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) } else { // not degree edge - val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version) + + val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer) pos = endAt val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) { (HBaseType.defaultTgtVertexId, 0) } else { - TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version) + TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer) } - val allProps = immutable.Map.newBuilder[Byte, InnerValLike] - /** process index props */ + + val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + /** process indexProps */ for { (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) } { - if (k == LabelMeta.degreeSeq) allProps += k -> v - else allProps += seq -> v + if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) + else allProps += seq -> InnerValLikeWithTs(v, version) } + /** process props */ if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) + allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) } else { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) props.foreach { case (k, v) => - allProps += (k -> v) + allProps += (k -> InnerValLikeWithTs(v, version)) } } val _mergedProps = allProps.result() val mergedProps = if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) + else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) + /** process tgtVertexId */ val tgtVertexId = mergedProps.get(LabelMeta.toSeq) match { case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) } - val ts = kv.timestamp - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) + + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/56829adc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala index 116412c..d00877e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -59,9 +59,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge val value = if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/56829adc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index 20770c0..b1b933f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -24,6 +24,7 @@ import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import scala.collection.immutable import scala.collection.immutable @@ -76,63 +77,60 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, _kvs: Seq[T], - version: String, + schemaVer: String, cacheElementOpt: Option[IndexEdge]): IndexEdge = { assert(_kvs.size == 1) val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } val kv = kvs.head + val version = kv.timestamp + val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) - }.getOrElse(parseRow(kv, version)) + }.getOrElse(parseRow(kv, schemaVer)) val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = - if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) - else parseQualifier(kv, version) - - val allProps = immutable.Map.newBuilder[Byte, InnerValLike] + if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer) + else parseQualifier(kv, schemaVer) - /** process index props */ + val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs] val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + /** process indexProps */ for { (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) } { - if (k == LabelMeta.degreeSeq) allProps += k -> v - else allProps += seq -> v + if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version) + else allProps += seq -> InnerValLikeWithTs(v, version) } /** process props */ if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) + allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) } else if (kv.qualifier.isEmpty) { val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.degreeSeq -> InnerVal.withLong(countVal, version)) + allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) } else { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - props.foreach { case (k, v) => - allProps += (k -> v) - } + val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer) + props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) } } val _mergedProps = allProps.result() val mergedProps = if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) + else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - val tgtVertexId = if (tgtVertexIdInQualifier) { + /** process tgtVertexId */ + val tgtVertexId = mergedProps.get(LabelMeta.toSeq) match { case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) } - } else tgtVertexIdRaw - // logger.error(s"$mergedProps") - // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong - val ts = kv.timestamp - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) + IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps) } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/56829adc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala index 189de22..49e95b4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -59,9 +59,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge val value = if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong) else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong) else propsToKeyValues(indexEdge.metas.toSeq) val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/56829adc/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala index 09d065f..d60641f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala @@ -248,6 +248,10 @@ object InnerValLikeWithTs extends HBaseDeserializable { InnerValLikeWithTs(InnerVal.withLong(l, version), ts) } + def withDouble(d: Double, ts: Long, version: String): InnerValLikeWithTs = { + InnerValLikeWithTs(InnerVal.withDouble(d, version), ts) + } + def withStr(s: String, ts: Long, version: String): InnerValLikeWithTs = { InnerValLikeWithTs(InnerVal.withStr(s, version), ts) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/56829adc/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala index 5edd5b1..ea90061 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala @@ -35,7 +35,7 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { * @param to: to VertexId for edge. * @param props: expected props of edge. */ - def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, InnerValLike]): Unit = { + def check(l: Label, ts: Long, to: InnerValLike, props: Map[Byte, InnerValLikeWithTs]): Unit = { val from = InnerVal.withLong(1, l.schemaVersion) val vertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, from) val tgtVertexId = TargetVertexId(HBaseType.DEFAULT_COL_ID, to) @@ -59,9 +59,9 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { l <- Seq(label, labelV2, labelV3, labelV4) } { val to = InnerVal.withLong(101, l.schemaVersion) - val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) - val props = Map(LabelMeta.timeStampSeq -> tsInnerVal, - 1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion)) + val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) + val props = Map(LabelMeta.timeStampSeq -> tsInnerValWithTs, + 1.toByte -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion)) check(l, ts, to, props) } @@ -73,10 +73,10 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { l <- Seq(label, labelV2, labelV3, labelV4) } { val to = InnerVal.withStr("0", l.schemaVersion) - val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) + val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) val props = Map( - LabelMeta.degreeSeq -> InnerVal.withLong(10, l.schemaVersion), - LabelMeta.timeStampSeq -> tsInnerVal) + LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion), + LabelMeta.timeStampSeq -> tsInnerValWithTs) check(l, ts, to, props) } @@ -88,13 +88,13 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { l <- Seq(label, labelV2, labelV3, labelV4) } { val to = InnerVal.withLong(101, l.schemaVersion) + val tsInnerValWithTs = InnerValLikeWithTs.withLong(ts, ts, l.schemaVersion) + val props = Map(LabelMeta.timeStampSeq -> tsInnerValWithTs, + 1.toByte -> InnerValLikeWithTs.withDouble(2.1, ts, l.schemaVersion), + LabelMeta.countSeq -> InnerValLikeWithTs.withLong(10, ts, l.schemaVersion)) - val tsInnerVal = InnerVal.withLong(ts, l.schemaVersion) - val props = Map(LabelMeta.timeStampSeq -> tsInnerVal, - 1.toByte -> InnerVal.withDouble(2.1, l.schemaVersion), - LabelMeta.countSeq -> InnerVal.withLong(10, l.schemaVersion)) check(l, ts, to, props) } } -} +} \ No newline at end of file
