http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala new file mode 100644 index 0000000..dacc37f --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala @@ -0,0 +1,53 @@ +package org.apache.s2graph.core.storage.serde.indexedge.tall + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.{GraphUtil, IndexEdge} + + +class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { + import StorageSerializable._ + + val label = indexEdge.label + val table = label.hbaseTableName.getBytes() + val cf = Serializable.edgeCf + + val idxPropsMap = indexEdge.orders.toMap + val idxPropsBytes = propsToBytes(indexEdge.orders) + + override def toKeyValues: Seq[SKeyValue] = { + val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes + val labelWithDirBytes = indexEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) + + val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") + + val qualifier = + if (indexEdge.degreeEdge) Array.empty[Byte] + else + idxPropsMap.get(LabelMeta.toSeq) match { + case None => Bytes.add(idxPropsBytes, VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes) + case Some(vId) => idxPropsBytes + } + + /** TODO search usage of op byte. if there is no, then remove opByte */ + val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier) + // val qualifierBytes = Array.fill(1)(indexEdge.op) + val qualifierBytes = Array.empty[Byte] + + val value = + if (indexEdge.degreeEdge) + Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + else if (indexEdge.op == GraphUtil.operations("incrementCount")) + Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + else propsToKeyValues(indexEdge.metas.toSeq) + + val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version) + + // logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") + Seq(kv) + } + }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala new file mode 100644 index 0000000..8b540cd --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -0,0 +1,116 @@ +package org.apache.s2graph.core.storage.serde.indexedge.wide + +import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.storage.StorageDeserializable._ +import org.apache.s2graph.core.storage._ +import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} + +class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { + import StorageDeserializable._ + + type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) + type ValueRaw = (Array[(Byte, InnerValLike)], Int) + + private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { + // val degree = Bytes.toLong(kv.value) + val degree = bytesToLongFunc(kv.value, 0) + val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) + val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) + (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) + } + + private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { + var qualifierLen = 0 + var pos = 0 + val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { + val (props, endAt) = bytesToProps(kv.qualifier, pos, version) + pos = endAt + qualifierLen += endAt + val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { + (HBaseType.defaultTgtVertexId, 0) + } else { + TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) + } + qualifierLen += tgtVertexIdLen + (props, endAt, tgtVertexId, tgtVertexIdLen) + } + val (op, opLen) = + if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) + else (kv.qualifier(qualifierLen), 1) + + qualifierLen += opLen + + (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) + } + + private def parseValue(kv: SKeyValue, version: String): ValueRaw = { + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) + (props, endAt) + } + + private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { + (Array.empty[(Byte, InnerValLike)], 0) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[IndexEdge]): IndexEdge = { + assert(_kvs.size == 1) + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) + }.getOrElse(parseRow(kv, version)) + + val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = + if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) + else parseQualifier(kv, version) + + val (props, _) = if (op == GraphUtil.operations("incrementCount")) { + // val countVal = Bytes.toLong(kv.value) + val countVal = bytesToLongFunc(kv.value, 0) + val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) + (dummyProps, 8) + } else if (kv.qualifier.isEmpty) { + parseDegreeValue(kv, version) + } else { + parseValue(kv, version) + } + + val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) + + + // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size) + + val idxProps = for { + (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) + } yield { + if (k == LabelMeta.degreeSeq) k -> v + else seq -> v + } + + val idxPropsMap = idxProps.toMap + val tgtVertexId = if (tgtVertexIdInQualifier) { + idxPropsMap.get(LabelMeta.toSeq) match { + case None => tgtVertexIdRaw + case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) + } + } else tgtVertexIdRaw + + val _mergedProps = (idxProps ++ props).toMap + val mergedProps = + if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps + else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) + + // logger.error(s"$mergedProps") + // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong + + val ts = kv.timestamp + IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) + + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala new file mode 100644 index 0000000..6c70ae1 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala @@ -0,0 +1,52 @@ +package org.apache.s2graph.core.storage.serde.indexedge.wide + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core.{GraphUtil, IndexEdge} + +class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { + import StorageSerializable._ + + val label = indexEdge.label + val table = label.hbaseTableName.getBytes() + val cf = Serializable.edgeCf + + val idxPropsMap = indexEdge.orders.toMap + val idxPropsBytes = propsToBytes(indexEdge.orders) + + override def toKeyValues: Seq[SKeyValue] = { + val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes + val labelWithDirBytes = indexEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) + + val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") + val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes + val qualifier = + if (indexEdge.degreeEdge) Array.empty[Byte] + else { + if (indexEdge.op == GraphUtil.operations("incrementCount")) { + Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) + } else { + idxPropsMap.get(LabelMeta.toSeq) match { + case None => Bytes.add(idxPropsBytes, tgtIdBytes) + case Some(vId) => idxPropsBytes + } + } + } + + + val value = + if (indexEdge.degreeEdge) + Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) + else if (indexEdge.op == GraphUtil.operations("incrementCount")) + Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) + else propsToKeyValues(indexEdge.metas.toSeq) + + val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) + + Seq(kv) + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala new file mode 100644 index 0000000..37d5910 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala @@ -0,0 +1,84 @@ +package org.apache.s2graph.core.storage.serde.snapshotedge.tall + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta} +import org.apache.s2graph.core.storage.StorageDeserializable._ +import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} +import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId} +import org.apache.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} + +class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { + + def statusCodeWithOp(byte: Byte): (Byte, Byte) = { + val statusCode = byte >> 4 + val op = byte & ((1 << 4) - 1) + (statusCode.toByte, op.toByte) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + assert(kvs.size == 1) + + val kv = kvs.head + val schemaVer = queryParam.label.schemaVersion + val cellVersion = kv.timestamp + /** rowKey */ + def parseRowV3(kv: SKeyValue, version: String) = { + var pos = 0 + val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version) + pos += srcIdAndTgtIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + + val rowLen = srcIdAndTgtIdLen + 4 + 1 + (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, labelIdxSeq, isInverted, rowLen) + + } + val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map { e => + (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) + }.getOrElse(parseRowV3(kv, schemaVer)) + + val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId) + val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId) + + val (props, op, ts, statusCode, _pendingEdgeOpt) = { + var pos = 0 + val (statusCode, op) = statusCodeWithOp(kv.value(pos)) + pos += 1 + val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + val kvsMap = props.toMap + val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong + + pos = endAt + val _pendingEdgeOpt = + if (pos == kv.value.length) None + else { + val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) + pos += 1 + // val versionNum = Bytes.toLong(kv.value, pos, 8) + // pos += 8 + val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + pos = endAt + val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) + + val pendingEdge = + Edge(Vertex(srcVertexId, cellVersion), + Vertex(tgtVertexId, cellVersion), + labelWithDir, pendingEdgeOp, + cellVersion, pendingEdgeProps.toMap, + statusCode = pendingEdgeStatusCode, lockTs = lockTs) + Option(pendingEdge) + } + + (kvsMap, op, ts, statusCode, _pendingEdgeOpt) + } + + SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), + labelWithDir, op, cellVersion, props, statusCode = statusCode, + pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala new file mode 100644 index 0000000..f018827 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -0,0 +1,47 @@ +package org.apache.s2graph.core.storage.serde.snapshotedge.tall + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.SnapshotEdge +import org.apache.s2graph.core.mysqls.LabelIndex +import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair + + +class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { + import StorageSerializable._ + + val label = snapshotEdge.label + val table = label.hbaseTableName.getBytes() + val cf = Serializable.edgeCf + + def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { + val byte = (((statusCode << 4) | op).toByte) + Array.fill(1)(byte.toByte) + } + def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), + propsToKeyValuesWithTs(snapshotEdge.props.toList)) + + override def toKeyValues: Seq[SKeyValue] = { + val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes + val labelWithDirBytes = snapshotEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) + + val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + + val qualifier = Array.empty[Byte] + + val value = snapshotEdge.pendingEdgeOpt match { + case None => valueBytes() + case Some(pendingEdge) => + val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) + val versionBytes = Array.empty[Byte] + val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) + val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) + + Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) + } + + val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) + Seq(kv) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala new file mode 100644 index 0000000..68eb125 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala @@ -0,0 +1,70 @@ +package org.apache.s2graph.core.storage.serde.snapshotedge.wide + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta} +import org.apache.s2graph.core.storage.StorageDeserializable._ +import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable} +import org.apache.s2graph.core.types.TargetVertexId +import org.apache.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} + +class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { + + def statusCodeWithOp(byte: Byte): (Byte, Byte) = { + val statusCode = byte >> 4 + val op = byte & ((1 << 4) - 1) + (statusCode.toByte, op.toByte) + } + + override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + assert(kvs.size == 1) + + val kv = kvs.head + val schemaVer = queryParam.label.schemaVersion + val cellVersion = kv.timestamp + + val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e => + (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) + }.getOrElse(parseRow(kv, schemaVer)) + + val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = { + val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer) + var pos = 0 + val (statusCode, op) = statusCodeWithOp(kv.value(pos)) + pos += 1 + val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + val kvsMap = props.toMap + val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong + + pos = endAt + val _pendingEdgeOpt = + if (pos == kv.value.length) None + else { + val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) + pos += 1 + // val versionNum = Bytes.toLong(kv.value, pos, 8) + // pos += 8 + val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) + pos = endAt + val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) + + val pendingEdge = + Edge(Vertex(srcVertexId, cellVersion), + Vertex(tgtVertexId, cellVersion), + labelWithDir, pendingEdgeOp, + cellVersion, pendingEdgeProps.toMap, + statusCode = pendingEdgeStatusCode, lockTs = lockTs) + Option(pendingEdge) + } + + (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt) + } + + SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), + labelWithDir, op, cellVersion, props, statusCode = statusCode, + pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala new file mode 100644 index 0000000..e4d0ac1 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala @@ -0,0 +1,51 @@ +package org.apache.s2graph.core.storage.serde.snapshotedge.wide + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.SnapshotEdge +import org.apache.s2graph.core.mysqls.LabelIndex +import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} +import org.apache.s2graph.core.types.VertexId + + + +/** + * this class serialize + * @param snapshotEdge + */ +class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { + import StorageSerializable._ + + val label = snapshotEdge.label + val table = label.hbaseTableName.getBytes() + val cf = Serializable.edgeCf + + def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { + val byte = (((statusCode << 4) | op).toByte) + Array.fill(1)(byte.toByte) + } + def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), + propsToKeyValuesWithTs(snapshotEdge.props.toList)) + + override def toKeyValues: Seq[SKeyValue] = { + val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes + val labelWithDirBytes = snapshotEdge.labelWithDir.bytes + val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) + + val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) + val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes + + val qualifier = tgtIdBytes + + val value = snapshotEdge.pendingEdgeOpt match { + case None => valueBytes() + case Some(pendingEdge) => + val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) + val versionBytes = Array.empty[Byte] + val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) + val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) + Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) + } + val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) + Seq(kv) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala new file mode 100644 index 0000000..00a5dc2 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala @@ -0,0 +1,47 @@ +package org.apache.s2graph.core.storage.serde.vertex + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable} +import org.apache.s2graph.core.types.{InnerVal, InnerValLike, VertexId} +import org.apache.s2graph.core.{QueryParam, Vertex} + +import scala.collection.mutable.ListBuffer + +class VertexDeserializable extends Deserializable[Vertex] { + def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, + _kvs: Seq[T], + version: String, + cacheElementOpt: Option[Vertex]): Vertex = { + + val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } + + val kv = kvs.head + val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) + + var maxTs = Long.MinValue + val propsMap = new collection.mutable.HashMap[Int, InnerValLike] + val belongLabelIds = new ListBuffer[Int] + + for { + kv <- kvs + } { + val propKey = + if (kv.qualifier.length == 1) kv.qualifier.head.toInt + else Bytes.toInt(kv.qualifier) + + val ts = kv.timestamp + if (ts > maxTs) maxTs = ts + + if (Vertex.isLabelId(propKey)) { + belongLabelIds += Vertex.toLabelId(propKey) + } else { + val v = kv.value + val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) + propsMap += (propKey -> value) + } + } + assert(maxTs != Long.MinValue) + Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala new file mode 100644 index 0000000..a81a86e --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala @@ -0,0 +1,19 @@ +package org.apache.s2graph.core.storage.serde.vertex + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.Vertex +import org.apache.s2graph.core.storage.{SKeyValue, Serializable} + +case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] { + + val cf = Serializable.vertexCf + + override def toKeyValues: Seq[SKeyValue] = { + val row = vertex.id.bytes + val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes + val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } + (base ++ belongsTo).map { case (qualifier, value) => + SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) + } toSeq + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala new file mode 100644 index 0000000..a207547 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala @@ -0,0 +1,160 @@ +package org.apache.s2graph.core.types + +import org.apache.hadoop.hbase.util.Bytes + +object HBaseType { + val VERSION4 = "v4" + val VERSION3 = "v3" + val VERSION2 = "v2" + val VERSION1 = "v1" +// val DEFAULT_VERSION = VERSION2 + val DEFAULT_VERSION = VERSION3 + val EMPTY_SEQ_BYTE = Byte.MaxValue + val DEFAULT_COL_ID = 0 + val bitsForDir = 2 + val maxBytes = Bytes.toBytes(Int.MaxValue) + val toSeqByte = -5.toByte + val defaultTgtVertexId = null +} + +object HBaseDeserializable { + + import HBaseType._ + + // 6 bits is used for index sequence so total index per label is limited to 2^6 + def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): (Byte, Boolean) = { + val byte = bytes(offset) + val isInverted = if ((byte & 1) != 0) true else false + val labelOrderSeq = byte >> 1 + (labelOrderSeq.toByte, isInverted) + } + + def bytesToKeyValues(bytes: Array[Byte], + offset: Int, + length: Int, + version: String = DEFAULT_VERSION): (Array[(Byte, InnerValLike)], Int) = { + var pos = offset + val len = bytes(pos) + pos += 1 + val kvs = new Array[(Byte, InnerValLike)](len) + var i = 0 + while (i < len) { + val k = bytes(pos) + pos += 1 + val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version) + pos += numOfBytesUsed + kvs(i) = (k -> v) + i += 1 + } + val ret = (kvs, pos) + // logger.debug(s"bytesToProps: $ret") + ret + } + + def bytesToKeyValuesWithTs(bytes: Array[Byte], + offset: Int, + version: String = DEFAULT_VERSION): (Array[(Byte, InnerValLikeWithTs)], Int) = { + var pos = offset + val len = bytes(pos) + pos += 1 + val kvs = new Array[(Byte, InnerValLikeWithTs)](len) + var i = 0 + while (i < len) { + val k = bytes(pos) + pos += 1 + val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, version) + pos += numOfBytesUsed + kvs(i) = (k -> v) + i += 1 + } + val ret = (kvs, pos) + // logger.debug(s"bytesToProps: $ret") + ret + } + + def bytesToProps(bytes: Array[Byte], + offset: Int, + version: String = DEFAULT_VERSION): (Array[(Byte, InnerValLike)], Int) = { + var pos = offset + val len = bytes(pos) + pos += 1 + val kvs = new Array[(Byte, InnerValLike)](len) + var i = 0 + while (i < len) { + val k = EMPTY_SEQ_BYTE + val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version) + pos += numOfBytesUsed + kvs(i) = (k -> v) + i += 1 + } + // logger.error(s"bytesToProps: $kvs") + val ret = (kvs, pos) + + ret + } +} + +object HBaseSerializable { + def propsToBytes(props: Seq[(Byte, InnerValLike)]): Array[Byte] = { + val len = props.length + assert(len < Byte.MaxValue) + var bytes = Array.fill(1)(len.toByte) + for ((k, v) <- props) bytes = Bytes.add(bytes, v.bytes) + bytes + } + + def propsToKeyValues(props: Seq[(Byte, InnerValLike)]): Array[Byte] = { + val len = props.length + assert(len < Byte.MaxValue) + var bytes = Array.fill(1)(len.toByte) + for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes) + bytes + } + + def propsToKeyValuesWithTs(props: Seq[(Byte, InnerValLikeWithTs)]): Array[Byte] = { + val len = props.length + assert(len < Byte.MaxValue) + var bytes = Array.fill(1)(len.toByte) + for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes) + bytes + } + + def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): Array[Byte] = { + assert(labelOrderSeq < (1 << 6)) + val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0) + Array.fill(1)(byte.toByte) + } +} + +trait HBaseSerializable { + def bytes: Array[Byte] +} + +trait HBaseDeserializable { + + import HBaseType._ + + def fromBytes(bytes: Array[Byte], + offset: Int, + len: Int, + version: String = DEFAULT_VERSION): (HBaseSerializable, Int) + + // def fromBytesWithIndex(bytes: Array[Byte], + // offset: Int, + // len: Int, + // version: String = DEFAULT_VERSION): (HBaseSerializable, Int) + def notSupportedEx(version: String) = new RuntimeException(s"not supported version, $version") +} + +trait HBaseDeserializableWithIsVertexId { + + import HBaseType._ + + def fromBytes(bytes: Array[Byte], + offset: Int, + len: Int, + version: String = DEFAULT_VERSION, + isVertexId: Boolean = false): (HBaseSerializable, Int) + + def notSupportedEx(version: String) = new RuntimeException(s"not supported version, $version") +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala new file mode 100644 index 0000000..dd1b833 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala @@ -0,0 +1,243 @@ +package org.apache.s2graph.core.types + +import org.apache.hadoop.hbase.util._ + +object InnerVal extends HBaseDeserializableWithIsVertexId { + import HBaseType._ + + val order = Order.DESCENDING + val stringLenOffset = 7.toByte + val maxStringLen = Byte.MaxValue - stringLenOffset + val maxMetaByte = Byte.MaxValue + val minMetaByte = 0.toByte + + /** supported data type */ + val BLOB = "blob" + val STRING = "string" + val DOUBLE = "double" + val FLOAT = "float" + val LONG = "long" + val INT = "integer" + val SHORT = "short" + val BYTE = "byte" + val NUMERICS = List(DOUBLE, FLOAT, LONG, INT, SHORT, BYTE) + val BOOLEAN = "boolean" + + def isNumericType(dataType: String): Boolean = { + dataType match { + case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => true + case _ => false + } + } + def toInnerDataType(dataType: String): String = { + dataType match { + case "blob" => BLOB + case "string" | "str" | "s" => STRING + case "double" | "d" | "float64" => DOUBLE + case "float" | "f" | "float32" => FLOAT + case "long" | "l" | "int64" | "integer64" => LONG + case "int" | "integer" | "i" | "int32" | "integer32" => INT + case "short" | "int16" | "integer16" => SHORT + case "byte" | "b" | "tinyint" | "int8" | "integer8" => BYTE + case "boolean" | "bool" => BOOLEAN + case _ => throw new RuntimeException(s"can`t convert $dataType into InnerDataType") + } + } + + def numByteRange(num: BigDecimal) = { +// val byteLen = +// if (num.isValidByte | num.isValidChar) 1 +// else if (num.isValidShort) 2 +// else if (num.isValidInt) 4 +// else if (num.isValidLong) 8 +// else if (num.isValidFloat) 4 +// else 12 + val byteLen = 12 + // else throw new RuntimeException(s"wrong data $num") + new SimplePositionedMutableByteRange(byteLen + 4) + } + + def fromBytes(bytes: Array[Byte], + offset: Int, + len: Int, + version: String, + isVertexId: Boolean): (InnerValLike, Int) = { + version match { + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal.fromBytes(bytes, offset, len, version, isVertexId) + case VERSION1 => v1.InnerVal.fromBytes(bytes, offset, len, version, isVertexId) + case _ => throw notSupportedEx(version) + } + } + + def withLong(l: Long, version: String): InnerValLike = { + version match { + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(l)) + case VERSION1 => v1.InnerVal(Some(l), None, None) + case _ => throw notSupportedEx(version) + } + } + + def withInt(i: Int, version: String): InnerValLike = { + version match { + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(i)) + case VERSION1 => v1.InnerVal(Some(i.toLong), None, None) + case _ => throw notSupportedEx(version) + } + } + + def withFloat(f: Float, version: String): InnerValLike = { + version match { + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(f.toDouble)) + case VERSION1 => v1.InnerVal(Some(f.toLong), None, None) + case _ => throw notSupportedEx(version) + } + } + + def withDouble(d: Double, version: String): InnerValLike = { + version match { + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(d)) + case VERSION1 => v1.InnerVal(Some(d.toLong), None, None) + case _ => throw notSupportedEx(version) + } + } + + def withNumber(num: BigDecimal, version: String): InnerValLike = { + version match { + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(num) + case VERSION1 => v1.InnerVal(Some(num.toLong), None, None) + case _ => throw notSupportedEx(version) + } + } + + def withBoolean(b: Boolean, version: String): InnerValLike = { + version match { + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(b) + case VERSION1 => v1.InnerVal(None, None, Some(b)) + case _ => throw notSupportedEx(version) + } + } + + def withBlob(blob: Array[Byte], version: String): InnerValLike = { + version match { + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(blob) + case _ => throw notSupportedEx(version) + } + } + + def withStr(s: String, version: String): InnerValLike = { + version match { + case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(s) + case VERSION1 => v1.InnerVal(None, Some(s), None) + case _ => throw notSupportedEx(version) + } + } + +// def withInnerVal(innerVal: InnerValLike, version: String): InnerValLike = { +// val bytes = innerVal.bytes +// version match { +// case VERSION2 => v2.InnerVal.fromBytes(bytes, 0, bytes.length, version)._1 +// case VERSION1 => v1.InnerVal.fromBytes(bytes, 0, bytes.length, version)._1 +// case _ => throw notSupportedEx(version) +// } +// } + + /** nasty implementation for backward compatability */ + def convertVersion(innerVal: InnerValLike, dataType: String, toVersion: String): InnerValLike = { + val ret = toVersion match { + case VERSION2 | VERSION3 | VERSION4 => + if (innerVal.isInstanceOf[v1.InnerVal]) { + val obj = innerVal.asInstanceOf[v1.InnerVal] + obj.valueType match { + case "long" => InnerVal.withLong(obj.longV.get, toVersion) + case "string" => InnerVal.withStr(obj.strV.get, toVersion) + case "boolean" => InnerVal.withBoolean(obj.boolV.get, toVersion) + case _ => throw new Exception(s"InnerVal should be [long/integeer/short/byte/string/boolean]") + } + } else { + innerVal + } + case VERSION1 => + if (innerVal.isInstanceOf[v2.InnerVal]) { + val obj = innerVal.asInstanceOf[v2.InnerVal] + obj.value match { + case str: String => InnerVal.withStr(str, toVersion) + case b: Boolean => InnerVal.withBoolean(b, toVersion) + case n: BigDecimal => InnerVal.withNumber(n, toVersion) + case n: Long => InnerVal.withNumber(n, toVersion) + case n: Double => InnerVal.withNumber(n, toVersion) + case n: Int => InnerVal.withNumber(n, toVersion) + case _ => throw notSupportedEx(s"v2 to v1: $obj -> $toVersion") + } + } else { + innerVal + } + case _ => throw notSupportedEx(toVersion) + } +// logger.debug(s"convertVersion: $innerVal, $dataType, $toVersion, $ret, ${innerVal.bytes.toList}, ${ret.bytes.toList}") + ret + } + +} + +trait InnerValLike extends HBaseSerializable { + + val value: Any + + def compare(other: InnerValLike): Int + + def +(other: InnerValLike): InnerValLike + + def <(other: InnerValLike) = this.compare(other) < 0 + + def <=(other: InnerValLike) = this.compare(other) <= 0 + + def >(other: InnerValLike) = this.compare(other) > 0 + + def >=(other: InnerValLike) = this.compare(other) >= 0 + + override def toString(): String = value.toString + + override def hashCode(): Int = value.hashCode() + + override def equals(obj: Any): Boolean = { + obj match { + case other: InnerValLike => + val ret = toString == obj.toString +// logger.debug(s"InnerValLike.equals($this, $obj) => $ret") + ret + case _ => false + } + } + def hashKey(dataType: String): Int + + def toIdString(): String + +} + +object InnerValLikeWithTs extends HBaseDeserializable { + import HBaseType._ + def fromBytes(bytes: Array[Byte], + offset: Int, + len: Int, + version: String = DEFAULT_VERSION): (InnerValLikeWithTs, Int) = { + val (innerVal, numOfBytesUsed) = InnerVal.fromBytes(bytes, offset, len, version) + val ts = Bytes.toLong(bytes, offset + numOfBytesUsed) + (InnerValLikeWithTs(innerVal, ts), numOfBytesUsed + 8) + } + + def withLong(l: Long, ts: Long, version: String): InnerValLikeWithTs = { + InnerValLikeWithTs(InnerVal.withLong(l, version), ts) + } + + def withStr(s: String, ts: Long, version: String): InnerValLikeWithTs = { + InnerValLikeWithTs(InnerVal.withStr(s, version), ts) + } +} + +case class InnerValLikeWithTs(innerVal: InnerValLike, ts: Long) + extends HBaseSerializable { + + def bytes: Array[Byte] = { + Bytes.add(innerVal.bytes, Bytes.toBytes(ts)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/types/LabelWithDirection.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/LabelWithDirection.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/LabelWithDirection.scala new file mode 100644 index 0000000..d34299b --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/LabelWithDirection.scala @@ -0,0 +1,61 @@ +package org.apache.s2graph.core.types + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.GraphUtil + +object LabelWithDirection { + + import HBaseType._ + + def apply(compositeInt: Int): LabelWithDirection = { + // logger.debug(s"CompositeInt: $compositeInt") + + val dir = compositeInt & ((1 << bitsForDir) - 1) + val labelId = compositeInt >> bitsForDir + LabelWithDirection(labelId, dir) + } + + def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): Array[Byte] = { + assert(labelOrderSeq < (1 << 6)) + val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0) + Array.fill(1)(byte.toByte) + } + + def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): (Byte, Boolean) = { + val byte = bytes(offset) + val isInverted = if ((byte & 1) != 0) true else false + val labelOrderSeq = byte >> 1 + (labelOrderSeq.toByte, isInverted) + } +} + +case class LabelWithDirection(labelId: Int, dir: Int) extends HBaseSerializable { + + import HBaseType._ + + assert(dir < (1 << bitsForDir)) + assert(labelId < (Int.MaxValue >> bitsForDir)) + + lazy val labelBits = labelId << bitsForDir + + lazy val compositeInt = labelBits | dir + + def bytes = { + Bytes.toBytes(compositeInt) + } + + lazy val dirToggled = LabelWithDirection(labelId, GraphUtil.toggleDir(dir)) + + def updateDir(newDir: Int) = LabelWithDirection(labelId, newDir) + + def isDirected = dir == 0 || dir == 1 + + override def hashCode(): Int = compositeInt + + override def equals(other: Any): Boolean = { + other match { + case o: LabelWithDirection => hashCode == o.hashCode() + case _ => false + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala new file mode 100644 index 0000000..79c7122 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala @@ -0,0 +1,142 @@ +package org.apache.s2graph.core.types + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.core.types.HBaseType._ + +object VertexId extends HBaseDeserializable { + import HBaseType._ + def fromBytes(bytes: Array[Byte], + offset: Int, + len: Int, + version: String = DEFAULT_VERSION): (VertexId, Int) = { + /** since murmur hash is prepended, skip numOfBytes for murmur hash */ + var pos = offset + GraphUtil.bytesForMurMurHash + + val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true) + pos += numOfBytesUsed + val colId = Bytes.toInt(bytes, pos, 4) + (VertexId(colId, innerId), GraphUtil.bytesForMurMurHash + numOfBytesUsed + 4) + } + + def apply(colId: Int, innerId: InnerValLike): VertexId = new VertexId(colId, innerId) + + def toSourceVertexId(vid: VertexId) = { + SourceVertexId(vid.colId, vid.innerId) + } + + def toTargetVertexId(vid: VertexId) = { + TargetVertexId(vid.colId, vid.innerId) + } +} + +class VertexId protected (val colId: Int, val innerId: InnerValLike) extends HBaseSerializable { + val storeHash: Boolean = true + val storeColId: Boolean = true + lazy val hashBytes = +// if (storeHash) Bytes.toBytes(GraphUtil.murmur3(innerId.toString)) + if (storeHash) Bytes.toBytes(GraphUtil.murmur3(innerId.toIdString())) + else Array.empty[Byte] + + lazy val colIdBytes: Array[Byte] = + if (storeColId) Bytes.toBytes(colId) + else Array.empty[Byte] + + def bytes: Array[Byte] = Bytes.add(hashBytes, innerId.bytes, colIdBytes) + + override def toString(): String = { + colId.toString() + "," + innerId.toString() +// s"VertexId($colId, $innerId)" + } + + override def hashCode(): Int = { + val ret = if (storeColId) { + colId * 31 + innerId.hashCode() + } else { + innerId.hashCode() + } +// logger.debug(s"VertexId.hashCode: $ret") + ret + } + override def equals(obj: Any): Boolean = { + val ret = obj match { + case other: VertexId => colId == other.colId && innerId.toIdString() == other.innerId.toIdString() + case _ => false + } +// logger.debug(s"VertexId.equals: $this, $obj => $ret") + ret + } + + def compareTo(other: VertexId): Int = { + Bytes.compareTo(bytes, other.bytes) + } + def <(other: VertexId): Boolean = compareTo(other) < 0 + def <=(other: VertexId): Boolean = compareTo(other) <= 0 + def >(other: VertexId): Boolean = compareTo(other) > 0 + def >=(other: VertexId): Boolean = compareTo(other) >= 0 +} + +object SourceVertexId extends HBaseDeserializable { + import HBaseType._ + def fromBytes(bytes: Array[Byte], + offset: Int, + len: Int, + version: String = DEFAULT_VERSION): (VertexId, Int) = { + /** since murmur hash is prepended, skip numOfBytes for murmur hash */ + val pos = offset + GraphUtil.bytesForMurMurHash + val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true) + + (SourceVertexId(DEFAULT_COL_ID, innerId), GraphUtil.bytesForMurMurHash + numOfBytesUsed) + } + +} + + +case class SourceVertexId(override val colId: Int, + override val innerId: InnerValLike) extends VertexId(colId, innerId) { + override val storeColId: Boolean = false +} + +object TargetVertexId extends HBaseDeserializable { + import HBaseType._ + def fromBytes(bytes: Array[Byte], + offset: Int, + len: Int, + version: String = DEFAULT_VERSION): (VertexId, Int) = { + /** murmur has is not prepended so start from offset */ + val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, offset, len, version, isVertexId = true) + (TargetVertexId(DEFAULT_COL_ID, innerId), numOfBytesUsed) + } +} + +case class TargetVertexId(override val colId: Int, + override val innerId: InnerValLike) + extends VertexId(colId, innerId) { + override val storeColId: Boolean = false + override val storeHash: Boolean = false + +} + +object SourceAndTargetVertexIdPair extends HBaseDeserializable { + val delimiter = ":" + import HBaseType._ + def fromBytes(bytes: Array[Byte], + offset: Int, + len: Int, + version: String = DEFAULT_VERSION): (SourceAndTargetVertexIdPair, Int) = { + val pos = offset + GraphUtil.bytesForMurMurHash + val (srcId, srcBytesLen) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true) + val (tgtId, tgtBytesLen) = InnerVal.fromBytes(bytes, pos + srcBytesLen, len, version, isVertexId = true) + (SourceAndTargetVertexIdPair(srcId, tgtId), GraphUtil.bytesForMurMurHash + srcBytesLen + tgtBytesLen) + } +} + +case class SourceAndTargetVertexIdPair(val srcInnerId: InnerValLike, val tgtInnerId: InnerValLike) extends HBaseSerializable { + val colId = DEFAULT_COL_ID + import SourceAndTargetVertexIdPair._ + override def bytes: Array[Byte] = { + val hashBytes = Bytes.toBytes(GraphUtil.murmur3(srcInnerId + delimiter + tgtInnerId)) + Bytes.add(hashBytes, srcInnerId.bytes, tgtInnerId.bytes) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/types/v1/InnerVal.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/v1/InnerVal.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/v1/InnerVal.scala new file mode 100644 index 0000000..c7b2c73 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/v1/InnerVal.scala @@ -0,0 +1,224 @@ +package org.apache.s2graph.core.types.v1 + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.GraphExceptions +import org.apache.s2graph.core.GraphExceptions.IllegalDataTypeException +import org.apache.s2graph.core.types.{HBaseDeserializableWithIsVertexId, HBaseSerializable, HBaseType, InnerValLike} + +object InnerVal extends HBaseDeserializableWithIsVertexId { + import HBaseType._ + // val defaultVal = new InnerVal(None, None, None) + val stringLenOffset = 7.toByte + val maxStringLen = Byte.MaxValue - stringLenOffset + val maxMetaByte = Byte.MaxValue + val minMetaByte = 0.toByte + /** + * first byte encoding rule. + * 0 => default + * 1 => long + * 2 => int + * 3 => short + * 4 => byte + * 5 => true + * 6 => false + * 7 ~ 127 => string len + 7 + */ + val metaByte = Map("default" -> 0, "long" -> 1, "int" -> 2, "short" -> 3, + "byte" -> 4, "true" -> 5, "false" -> 6).map { + case (k, v) => (k, v.toByte) + } + val metaByteRev = metaByte.map { case (k, v) => (v.toByte, k) } ++ metaByte.map { case (k, v) => ((-v).toByte, k) } + + def maxIdVal(dataType: String) = { + dataType match { + case "string" => InnerVal.withStr((0 until (Byte.MaxValue - stringLenOffset)).map("~").mkString) + case "long" => InnerVal.withLong(Long.MaxValue) + case "bool" => InnerVal.withBoolean(true) + case _ => throw IllegalDataTypeException(dataType) + } + } + + def minIdVal(dataType: String) = { + dataType match { + case "string" => InnerVal.withStr("") + case "long" => InnerVal.withLong(1) + case "bool" => InnerVal.withBoolean(false) + case _ => throw IllegalDataTypeException(dataType) + } + } + + def fromBytes(bytes: Array[Byte], offset: Int, len: Int, version: String = DEFAULT_VERSION, isVertexId: Boolean = false): (InnerVal, Int) = { + var pos = offset + // + val header = bytes(pos) + // logger.debug(s"${bytes(offset)}: ${bytes.toList.slice(pos, bytes.length)}") + pos += 1 + + var numOfBytesUsed = 0 + val (longV, strV, boolV) = metaByteRev.get(header) match { + case Some(s) => + s match { + case "default" => + (None, None, None) + case "true" => + numOfBytesUsed = 0 + (None, None, Some(true)) + case "false" => + numOfBytesUsed = 0 + (None, None, Some(false)) + case "byte" => + numOfBytesUsed = 1 + val b = bytes(pos) + val value = if (b >= 0) Byte.MaxValue - b else Byte.MinValue - b - 1 + (Some(value.toLong), None, None) + case "short" => + numOfBytesUsed = 2 + val b = Bytes.toShort(bytes, pos, 2) + val value = if (b >= 0) Short.MaxValue - b else Short.MinValue - b - 1 + (Some(value.toLong), None, None) + case "int" => + numOfBytesUsed = 4 + val b = Bytes.toInt(bytes, pos, 4) + val value = if (b >= 0) Int.MaxValue - b else Int.MinValue - b - 1 + (Some(value.toLong), None, None) + case "long" => + numOfBytesUsed = 8 + val b = Bytes.toLong(bytes, pos, 8) + val value = if (b >= 0) Long.MaxValue - b else Long.MinValue - b - 1 + (Some(value.toLong), None, None) + } + case _ => // string + val strLen = header - stringLenOffset + numOfBytesUsed = strLen + (None, Some(Bytes.toString(bytes, pos, strLen)), None) + } + + (InnerVal(longV, strV, boolV), numOfBytesUsed + 1) + } + + def withLong(l: Long): InnerVal = { + // if (l < 0) throw new IllegalDataRangeException("value shoudl be >= 0") + InnerVal(Some(l), None, None) + } + + def withStr(s: String): InnerVal = { + InnerVal(None, Some(s), None) + } + + def withBoolean(b: Boolean): InnerVal = { + InnerVal(None, None, Some(b)) + } + + /** + * In natural order + * -129, -128 , -2, -1 < 0 < 1, 2, 127, 128 + * + * In byte order + * 0 < 1, 2, 127, 128 < -129, -128, -2, -1 + * + */ + def transform(l: Long): (Byte, Array[Byte]) = { + if (Byte.MinValue <= l && l <= Byte.MaxValue) { + // val value = if (l < 0) l - Byte.MinValue else l + Byte.MinValue + val key = if (l >= 0) metaByte("byte") else -metaByte("byte") + val value = if (l >= 0) Byte.MaxValue - l else Byte.MinValue - l - 1 + val valueBytes = Array.fill(1)(value.toByte) + (key.toByte, valueBytes) + } else if (Short.MinValue <= l && l <= Short.MaxValue) { + val key = if (l >= 0) metaByte("short") else -metaByte("short") + val value = if (l >= 0) Short.MaxValue - l else Short.MinValue - l - 1 + val valueBytes = Bytes.toBytes(value.toShort) + (key.toByte, valueBytes) + } else if (Int.MinValue <= l && l <= Int.MaxValue) { + val key = if (l >= 0) metaByte("int") else -metaByte("int") + val value = if (l >= 0) Int.MaxValue - l else Int.MinValue - l - 1 + val valueBytes = Bytes.toBytes(value.toInt) + (key.toByte, valueBytes) + } else if (Long.MinValue <= l && l <= Long.MaxValue) { + val key = if (l >= 0) metaByte("long") else -metaByte("long") + val value = if (l >= 0) Long.MaxValue - l else Long.MinValue - l - 1 + val valueBytes = Bytes.toBytes(value.toLong) + (key.toByte, valueBytes) + } else { + throw new Exception(s"InnerVal range is out: $l") + } + } +} + +case class InnerVal(longV: Option[Long], strV: Option[String], boolV: Option[Boolean]) + extends HBaseSerializable with InnerValLike { + + import InnerVal._ + + lazy val isDefault = longV.isEmpty && strV.isEmpty && boolV.isEmpty + val value = (longV, strV, boolV) match { + case (Some(l), None, None) => l + case (None, Some(s), None) => s + case (None, None, Some(b)) => b + case _ => throw new Exception(s"InnerVal should be [long/integeer/short/byte/string/boolean]") + } + def valueType = (longV, strV, boolV) match { + case (Some(l), None, None) => "long" + case (None, Some(s), None) => "string" + case (None, None, Some(b)) => "boolean" + case _ => throw new Exception(s"InnerVal should be [long/integeer/short/byte/string/boolean]") + } + + def compare(other: InnerValLike): Int = { + if (!other.isInstanceOf[InnerVal]) { + throw new RuntimeException(s"compare between $this vs $other is not supported") + } else { +// (value, other.value) match { +// case (v1: Long, v2: Long) => v1.compare(v2) +// case (b1: Boolean, b2: Boolean) => b1.compare(b2) +// case (s1: String, s2: String) => s1.compare(s2) +// case _ => throw new Exception("Please check a type of the compare operands") +// } + Bytes.compareTo(bytes, other.bytes) * -1 + } + } + + def +(other: InnerValLike) = { + if (!other.isInstanceOf[InnerVal]) { + throw new RuntimeException(s"+ between $this vs $other is not supported") + } else { + (value, other.value) match { + case (v1: Long, v2: Long) => InnerVal.withLong(v1 + v2) + case (b1: Boolean, b2: Boolean) => InnerVal.withBoolean(if (b2) !b1 else b1) + case _ => throw new Exception("Please check a type of the incr operands") + } + } + } + + def bytes = { + val (meta, valBytes) = (longV, strV, boolV) match { + case (None, None, None) => + (metaByte("default"), Array.empty[Byte]) + case (Some(l), None, None) => + transform(l) + case (None, None, Some(b)) => + val meta = if (b) metaByte("true") else metaByte("false") + (meta, Array.empty[Byte]) + case (None, Some(s), None) => + val sBytes = Bytes.toBytes(s) + if (sBytes.length > maxStringLen) { + throw new IllegalDataTypeException(s"string in innerVal maxSize is $maxStringLen, given ${sBytes.length}") + } + assert(sBytes.length <= maxStringLen) + val meta = (stringLenOffset + sBytes.length).toByte + (meta, sBytes) + case _ => throw new IllegalDataTypeException("innerVal data type should be [long/string/bool]") + } + Bytes.add(Array.fill(1)(meta.toByte), valBytes) + } + + override def toString(): String = { + value.toString + } + override def hashKey(dataType: String): Int = { + value.toString.hashCode() + } + override def toIdString(): String = { + value.toString + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala new file mode 100644 index 0000000..a511f17 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala @@ -0,0 +1,160 @@ +package org.apache.s2graph.core.types.v2 + +import org.apache.hadoop.hbase.util._ +import org.apache.s2graph.core.types +import org.apache.s2graph.core.types.{HBaseDeserializableWithIsVertexId, HBaseSerializable, HBaseType, InnerValLike} + +object InnerVal extends HBaseDeserializableWithIsVertexId { + + import HBaseType._ + + val order = Order.DESCENDING + + def fromBytes(bytes: Array[Byte], + offset: Int, + len: Int, + version: String = DEFAULT_VERSION, + isVertexId: Boolean = false): (InnerVal, Int) = { + val pbr = new SimplePositionedByteRange(bytes) + pbr.setPosition(offset) + val startPos = pbr.getPosition + if (bytes(offset) == -1 | bytes(offset) == 0) { + /** simple boolean */ + val boolean = order match { + case Order.DESCENDING => bytes(offset) == 0 + case _ => bytes(offset) == -1 + } + (InnerVal(boolean), 1) + } + else { + if (OrderedBytes.isNumeric(pbr)) { + val numeric = OrderedBytes.decodeNumericAsBigDecimal(pbr) + if (isVertexId) (InnerVal(numeric.longValue()), pbr.getPosition - startPos) + else (InnerVal(BigDecimal(numeric)), pbr.getPosition - startPos) +// (InnerVal(numeric.doubleValue()), pbr.getPosition - startPos) +// (InnerVal(BigDecimal(numeric)), pbr.getPosition - startPos) + } else if (OrderedBytes.isText(pbr)) { + val str = OrderedBytes.decodeString(pbr) + (InnerVal(str), pbr.getPosition - startPos) + } else if (OrderedBytes.isBlobVar(pbr)) { + val blobVar = OrderedBytes.decodeBlobVar(pbr) + (InnerVal(blobVar), pbr.getPosition - startPos) + } else { + throw new RuntimeException("!!") + } + } + } +} + +case class InnerVal(value: Any) extends HBaseSerializable with InnerValLike { + + import types.InnerVal._ + + def bytes: Array[Byte] = { + val ret = value match { + case b: Boolean => + + /** since OrderedBytes header start from 0x05, it is safe to use -1, 0 + * for decreasing order (true, false) */ + // Bytes.toBytes(b) + order match { + case Order.DESCENDING => if (b) Array(0.toByte) else Array(-1.toByte) + case _ => if (!b) Array(0.toByte) else Array(-1.toByte) + } + case d: Double => + val num = BigDecimal(d) + val pbr = numByteRange(num) + val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order) + pbr.getBytes().take(len) + case l: Long => + val num = BigDecimal(l) + val pbr = numByteRange(num) + val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order) + pbr.getBytes().take(len) + case i: Int => + val num = BigDecimal(i) + val pbr = numByteRange(num) + val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order) + pbr.getBytes().take(len) + case sh: Short => + val num = BigDecimal(sh) + val pbr = numByteRange(num) + val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order) + pbr.getBytes().take(len) + case b: Byte => + val num = BigDecimal(b) + val pbr = numByteRange(num) + val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order) + pbr.getBytes().take(len) + + + case b: BigDecimal => + val pbr = numByteRange(b) + val len = OrderedBytes.encodeNumeric(pbr, b.bigDecimal, order) + pbr.getBytes().take(len) + case s: String => + val pbr = new SimplePositionedMutableByteRange(s.getBytes.length + 3) + val len = OrderedBytes.encodeString(pbr, s, order) + pbr.getBytes().take(len) + case blob: Array[Byte] => + val len = OrderedBytes.blobVarEncodedLength(blob.length) + val pbr = new SimplePositionedMutableByteRange(len) + val totalLen = OrderedBytes.encodeBlobVar(pbr, blob, order) + pbr.getBytes().take(totalLen) + } + // println(s"$value => ${ret.toList}, ${ret.length}") + ret + } + +// + override def hashKey(dataType: String): Int = { + if (value.isInstanceOf[String]) { + // since we use dummy stringn value for degree edge. + value.toString.hashCode() + } else { + dataType match { + case BYTE => value.asInstanceOf[BigDecimal].bigDecimal.byteValue().hashCode() + case FLOAT => value.asInstanceOf[BigDecimal].bigDecimal.floatValue().hashCode() + case DOUBLE => value.asInstanceOf[BigDecimal].bigDecimal.doubleValue().hashCode() + case LONG => value.asInstanceOf[BigDecimal].bigDecimal.longValue().hashCode() + case INT => value.asInstanceOf[BigDecimal].bigDecimal.intValue().hashCode() + case SHORT => value.asInstanceOf[BigDecimal].bigDecimal.shortValue().hashCode() + case STRING => value.toString.hashCode + case _ => throw new RuntimeException(s"NotSupportede type: $dataType") + } + } + } + + def compare(other: InnerValLike): Int = { + if (!other.isInstanceOf[InnerValLike]) + throw new RuntimeException(s"compare $this vs $other") + Bytes.compareTo(bytes, other.bytes) * -1 + } + + def +(other: InnerValLike): InnerValLike = { + if (!other.isInstanceOf[InnerValLike]) + throw new RuntimeException(s"+ $this, $other") + + (value, other.value) match { + case (v1: BigDecimal, v2: BigDecimal) => new InnerVal(BigDecimal(v1.bigDecimal.add(v2.bigDecimal))) + case _ => throw new RuntimeException("+ operation on inner val is for big decimal pair") + } + } + + //need to be removed ?? + override def toString(): String = { +// value.toString() + value match { + case n: BigDecimal => n.bigDecimal.toPlainString + case _ => value.toString + } + } + + override def toIdString(): String = { + value match { + case n: BigDecimal => n.bigDecimal.longValue().toString() + case _ => value.toString + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala new file mode 100644 index 0000000..59452df --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala @@ -0,0 +1,82 @@ +package org.apache.s2graph.core.utils + +import java.util.concurrent.TimeUnit + +import com.google.common.cache.CacheBuilder +import com.stumbleupon.async.Deferred +import com.typesafe.config.Config + +import scala.concurrent.ExecutionContext + +class DeferCache[R](config: Config)(implicit ex: ExecutionContext) { + + import Extensions.DeferOps + + type Value = (Long, Deferred[R]) + + private val maxSize = config.getInt("future.cache.max.size") + private val expireAfterWrite = config.getInt("future.cache.expire.after.write") + private val expireAfterAccess = config.getInt("future.cache.expire.after.access") + + private val futureCache = CacheBuilder.newBuilder() + .initialCapacity(maxSize) + .concurrencyLevel(Runtime.getRuntime.availableProcessors()) + .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) + .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS) + .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[R])]() + + + def asMap() = futureCache.asMap() + + def getIfPresent(cacheKey: Long): Value = futureCache.getIfPresent(cacheKey) + + private def checkAndExpire(cacheKey: Long, + cachedAt: Long, + cacheTTL: Long, + oldDefer: Deferred[R])(op: => Deferred[R]): Deferred[R] = { + if (System.currentTimeMillis() >= cachedAt + cacheTTL) { + // future is too old. so need to expire and fetch new data from storage. + futureCache.asMap().remove(cacheKey) + + val newPromise = new Deferred[R]() + val now = System.currentTimeMillis() + + futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match { + case null => + // only one thread succeed to come here concurrently + // initiate fetch to storage then add callback on complete to finish promise. + op withCallback { value => + newPromise.callback(value) + value + } + newPromise + case (cachedAt, oldDefer) => oldDefer + } + } else { + // future is not to old so reuse it. + oldDefer + } + } + def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Deferred[R]): Deferred[R] = { + val cacheVal = futureCache.getIfPresent(cacheKey) + cacheVal match { + case null => + val promise = new Deferred[R]() + val now = System.currentTimeMillis() + val (cachedAt, defer) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { + case null => + op.withCallback { value => + promise.callback(value) + value + } + (now, promise) + case oldVal => oldVal + } + checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op) + + case (cachedAt, defer) => + checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op) + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala new file mode 100644 index 0000000..5805c32 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala @@ -0,0 +1,73 @@ +package org.apache.s2graph.core.utils + +import com.stumbleupon.async.{Callback, Deferred} +import com.typesafe.config.Config + +import scala.concurrent.{ExecutionContext, Future, Promise} + +object Extensions { + + + def retryOnSuccess[T](maxRetryNum: Int, n: Int = 1)(fn: => Future[T])(shouldStop: T => Boolean)(implicit ex: ExecutionContext): Future[T] = n match { + case i if n <= maxRetryNum => + fn.flatMap { result => + if (!shouldStop(result)) { + logger.info(s"retryOnSuccess $n") + retryOnSuccess(maxRetryNum, n + 1)(fn)(shouldStop) + } else { + Future.successful(result) + } + } + case _ => fn + } + + def retryOnFailure[T](maxRetryNum: Int, n: Int = 1)(fn: => Future[T])(fallback: => T)(implicit ex: ExecutionContext): Future[T] = n match { + case i if n <= maxRetryNum => + fn recoverWith { case t: Throwable => + logger.info(s"retryOnFailure $n $t") + retryOnFailure(maxRetryNum, n + 1)(fn)(fallback) + } + case _ => + Future.successful(fallback) + } + + + implicit class DeferOps[T](d: Deferred[T])(implicit ex: ExecutionContext) { + + def withCallback[R](op: T => R): Deferred[R] = { + d.addCallback(new Callback[R, T] { + override def call(arg: T): R = op(arg) + }) + } + + def recoverWith(op: Exception => T): Deferred[T] = { + d.addErrback(new Callback[Deferred[T], Exception] { + override def call(e: Exception): Deferred[T] = Deferred.fromResult(op(e)) + }) + } + + + def toFuture: Future[T] = { + val promise = Promise[T] + + d.addBoth(new Callback[Unit, T] { + def call(arg: T) = arg match { + case e: Exception => promise.failure(e) + case _ => promise.success(arg) + } + }) + + promise.future + } + + def toFutureWith(fallback: => T): Future[T] = { + toFuture recoverWith { case t: Throwable => Future.successful(fallback) } + } + + } + + implicit class ConfigOps(config: Config) { + def getBooleanWithFallback(key: String, defaultValue: Boolean): Boolean = + if (config.hasPath(key)) config.getBoolean(key) else defaultValue + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala new file mode 100644 index 0000000..b23566e --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala @@ -0,0 +1,82 @@ +package org.apache.s2graph.core.utils + +import java.util.concurrent.TimeUnit + +import com.google.common.cache.CacheBuilder +import com.typesafe.config.Config + +import scala.concurrent.{ExecutionContext, Future, Promise} + + +class FutureCache[R](config: Config)(implicit ex: ExecutionContext) { + + type Value = (Long, Future[R]) + + private val maxSize = config.getInt("future.cache.max.size") + private val expireAfterWrite = config.getInt("future.cache.expire.after.write") + private val expireAfterAccess = config.getInt("future.cache.expire.after.access") + + private val futureCache = CacheBuilder.newBuilder() + .initialCapacity(maxSize) + .concurrencyLevel(Runtime.getRuntime.availableProcessors()) + .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) + .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS) + .maximumSize(maxSize).build[java.lang.Long, (Long, Promise[R])]() + + + def asMap() = futureCache.asMap() + + def getIfPresent(cacheKey: Long): Value = { + val (cachedAt, promise) = futureCache.getIfPresent(cacheKey) + (cachedAt, promise.future) + } + + private def checkAndExpire(cacheKey: Long, + cachedAt: Long, + cacheTTL: Long, + oldFuture: Future[R])(op: => Future[R]): Future[R] = { + if (System.currentTimeMillis() >= cachedAt + cacheTTL) { + // future is too old. so need to expire and fetch new data from storage. + futureCache.asMap().remove(cacheKey) + + val newPromise = Promise[R] + val now = System.currentTimeMillis() + + futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match { + case null => + // only one thread succeed to come here concurrently + // initiate fetch to storage then add callback on complete to finish promise. + op.onSuccess { case value => + newPromise.success(value) + value + } + newPromise.future + case (cachedAt, oldPromise) => oldPromise.future + } + } else { + // future is not to old so reuse it. + oldFuture + } + } + def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Future[R]): Future[R] = { + val cacheVal = futureCache.getIfPresent(cacheKey) + cacheVal match { + case null => + val promise = Promise[R] + val now = System.currentTimeMillis() + val (cachedAt, cachedPromise) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { + case null => + op.onSuccess { case value => + promise.success(value) + value + } + (now, promise) + case oldVal => oldVal + } + checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op) + + case (cachedAt, cachedPromise) => + checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala new file mode 100644 index 0000000..6b02a00 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala @@ -0,0 +1,44 @@ +package org.apache.s2graph.core.utils + +import org.slf4j.LoggerFactory +import play.api.libs.json.JsValue + +import scala.language.{higherKinds, implicitConversions} + +object logger { + + trait Loggable[T] { + def toLogMessage(msg: T): String + } + + object Loggable { + implicit val stringLoggable = new Loggable[String] { + def toLogMessage(msg: String) = msg + } + + implicit def numericLoggable[T: Numeric] = new Loggable[T] { + def toLogMessage(msg: T) = msg.toString + } + + implicit val jsonLoggable = new Loggable[JsValue] { + def toLogMessage(msg: JsValue) = msg.toString() + } + + implicit val booleanLoggable = new Loggable[Boolean] { + def toLogMessage(msg: Boolean) = msg.toString() + } + } + + private val logger = LoggerFactory.getLogger("application") + private val errorLogger = LoggerFactory.getLogger("error") + + def info[T: Loggable](msg: => T) = logger.info(implicitly[Loggable[T]].toLogMessage(msg)) + + def debug[T: Loggable](msg: => T) = logger.debug(implicitly[Loggable[T]].toLogMessage(msg)) + + def error[T: Loggable](msg: => T, exception: => Throwable) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception) + + def error[T: Loggable](msg: => T) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg)) +} + + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala new file mode 100644 index 0000000..3c67e42 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala @@ -0,0 +1,62 @@ +package org.apache.s2graph.core.utils + +import java.util.concurrent.atomic.AtomicBoolean + +import com.google.common.cache.CacheBuilder + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +object SafeUpdateCache { + + case class CacheKey(key: String) + +} + +class SafeUpdateCache[T](prefix: String, maxSize: Int, ttl: Int)(implicit executionContext: ExecutionContext) { + + import SafeUpdateCache._ + + implicit class StringOps(key: String) { + def toCacheKey = new CacheKey(prefix + ":" + key) + } + + def toTs() = (System.currentTimeMillis() / 1000).toInt + + private val cache = CacheBuilder.newBuilder().maximumSize(maxSize).build[CacheKey, (T, Int, AtomicBoolean)]() + + def put(key: String, value: T) = cache.put(key.toCacheKey, (value, toTs, new AtomicBoolean(false))) + + def invalidate(key: String) = cache.invalidate(key.toCacheKey) + + def withCache(key: String)(op: => T): T = { + val cacheKey = key.toCacheKey + val cachedValWithTs = cache.getIfPresent(cacheKey) + + if (cachedValWithTs == null) { + // fetch and update cache. + val newValue = op + cache.put(cacheKey, (newValue, toTs(), new AtomicBoolean(false))) + newValue + } else { + val (cachedVal, updatedAt, isUpdating) = cachedValWithTs + if (toTs() < updatedAt + ttl) cachedVal // in cache TTL + else { + val running = isUpdating.getAndSet(true) + if (running) cachedVal + else { + Future(op)(executionContext) onComplete { + case Failure(ex) => + cache.put(cacheKey, (cachedVal, toTs(), new AtomicBoolean(false))) // keep old value + logger.error(s"withCache update failed: $cacheKey") + case Success(newValue) => + cache.put(cacheKey, (newValue, toTs(), new AtomicBoolean(false))) // update new value + logger.info(s"withCache update success: $cacheKey") + } + cachedVal + } + } + } + } +} +
