Repository: incubator-s2graph Updated Branches: refs/heads/master fabb212dd -> 4e40ec7e6
[S2GRAPH-66]: Optimize toEdge, IndexEdgeDeserializable using mutable Map. use immutable.Map.newBuilder to build merged props. JIRA: [S2GRAPH-66] https://issues.apache.org/jira/browse/S2GRAPH-66 Pull Request: Closes #50 Authors: DOYUNG YOON: [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/4e40ec7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/4e40ec7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/4e40ec7e Branch: refs/heads/master Commit: 4e40ec7e6a41acd8e46c0815bdf1fe35098dd360 Parents: fabb212 Author: DO YUNG YOON <[email protected]> Authored: Sat Jun 11 23:16:31 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sat Jun 11 23:16:31 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tall/IndexEdgeDeserializable.scala | 43 ++++++++++------- .../wide/IndexEdgeDeserializable.scala | 51 +++++++++++--------- 3 files changed, 54 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e40ec7e/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index a7f7512..8ee4761 100644 --- a/CHANGES +++ b/CHANGES @@ -65,6 +65,8 @@ Release 0.12.1 - unreleased (Committed by DOYUNG YOON). S2GRAPH-55: Add param to enable epoll event loop in experimental netty http server (Committed by daewon). + + S2GRAPH-66: Optimize toEdge, IndexEdgeDeserializable using mutable Map. (Committed by DOYUN GYOON). S2GRAPH-22: Add missing shebang line to file. (Contributed by Injun Song<[email protected]>, committed by DOYUNG YOON) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e40ec7e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index f233940..da029ef 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -26,6 +26,8 @@ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import scala.collection.immutable + class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { import StorageDeserializable._ @@ -107,7 +109,6 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props) } else { // not degree edge - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, version) pos = endAt @@ -117,32 +118,38 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, version) } - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield if (k == LabelMeta.degreeSeq) k -> v else seq -> v - - val idxPropsMap = idxProps.toMap - - val tgtVertexId = - idxPropsMap.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } + val allProps = immutable.Map.newBuilder[Byte, InnerValLike] + /** process index props */ + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> v + else allProps += seq -> v + } + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) + allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) } else { - bytesToKeyValues(kv.value, 0, kv.value.length, version) + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + props.foreach { case (k, v) => + allProps += (k -> v) + } } - - val _mergedProps = (idxProps ++ props).toMap + val _mergedProps = allProps.result() val mergedProps = if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) + val tgtVertexId = + mergedProps.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) + } + val ts = kv.timestamp IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/4e40ec7e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index d8bef97..20770c0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -25,6 +25,8 @@ import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} +import scala.collection.immutable + class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { import StorageDeserializable._ @@ -89,42 +91,43 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) else parseQualifier(kv, version) - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + val allProps = immutable.Map.newBuilder[Byte, InnerValLike] + + /** process index props */ + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } { + if (k == LabelMeta.degreeSeq) allProps += k -> v + else allProps += seq -> v + } + + /** process props */ + if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) + allProps += (LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) } else if (kv.qualifier.isEmpty) { - parseDegreeValue(kv, version) + val countVal = bytesToLongFunc(kv.value, 0) + allProps += (LabelMeta.degreeSeq -> InnerVal.withLong(countVal, version)) } else { - parseValue(kv, version) + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + props.foreach { case (k, v) => + allProps += (k -> v) + } } - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - - // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size) - - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield { - if (k == LabelMeta.degreeSeq) k -> v - else seq -> v - } + val _mergedProps = allProps.result() + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - val idxPropsMap = idxProps.toMap val tgtVertexId = if (tgtVertexIdInQualifier) { - idxPropsMap.get(LabelMeta.toSeq) match { + mergedProps.get(LabelMeta.toSeq) match { case None => tgtVertexIdRaw case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) } } else tgtVertexIdRaw - - val _mergedProps = (idxProps ++ props).toMap - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - // logger.error(s"$mergedProps") // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong
