http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala deleted file mode 100644 index 46ad15f..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala +++ /dev/null @@ -1,53 +0,0 @@ -package com.kakao.s2graph.core.storage.serde.indexedge.tall - -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} -import com.kakao.s2graph.core.types.VertexId -import com.kakao.s2graph.core.{GraphUtil, IndexEdge} -import org.apache.hadoop.hbase.util.Bytes - - -class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { - import StorageSerializable._ - - val label = indexEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf - - val idxPropsMap = indexEdge.orders.toMap - val idxPropsBytes = propsToBytes(indexEdge.orders) - - override def toKeyValues: Seq[SKeyValue] = { - val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes - val labelWithDirBytes = indexEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") - - val qualifier = - if (indexEdge.degreeEdge) Array.empty[Byte] - else - idxPropsMap.get(LabelMeta.toSeq) match { - case None => Bytes.add(idxPropsBytes, VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes) - case Some(vId) => idxPropsBytes - } - - /** TODO search usage of op byte. if there is no, then remove opByte */ - val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier) - // val qualifierBytes = Array.fill(1)(indexEdge.op) - val qualifierBytes = Array.empty[Byte] - - val value = - if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) - else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) - else propsToKeyValues(indexEdge.metas.toSeq) - - val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, indexEdge.version) - - // logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}") - Seq(kv) - } - }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala deleted file mode 100644 index f83dd1f..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ /dev/null @@ -1,116 +0,0 @@ -package com.kakao.s2graph.core.storage.serde.indexedge.wide - -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.storage.StorageDeserializable._ -import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} -import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex} - -class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { - import StorageDeserializable._ - - type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int) - type ValueRaw = (Array[(Byte, InnerValLike)], Int) - - private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = { - // val degree = Bytes.toLong(kv.value) - val degree = bytesToLongFunc(kv.value, 0) - val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version)) - val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version)) - (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0) - } - - private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = { - var qualifierLen = 0 - var pos = 0 - val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = { - val (props, endAt) = bytesToProps(kv.qualifier, pos, version) - pos = endAt - qualifierLen += endAt - val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) { - (HBaseType.defaultTgtVertexId, 0) - } else { - TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version) - } - qualifierLen += tgtVertexIdLen - (props, endAt, tgtVertexId, tgtVertexIdLen) - } - val (op, opLen) = - if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0) - else (kv.qualifier(qualifierLen), 1) - - qualifierLen += opLen - - (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen) - } - - private def parseValue(kv: SKeyValue, version: String): ValueRaw = { - val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version) - (props, endAt) - } - - private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = { - (Array.empty[(Byte, InnerValLike)], 0) - } - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { - assert(_kvs.size == 1) - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) - }.getOrElse(parseRow(kv, version)) - - val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = - if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version) - else parseQualifier(kv, version) - - val (props, _) = if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) - val countVal = bytesToLongFunc(kv.value, 0) - val dummyProps = Array(LabelMeta.countSeq -> InnerVal.withLong(countVal, version)) - (dummyProps, 8) - } else if (kv.qualifier.isEmpty) { - parseDegreeValue(kv, version) - } else { - parseValue(kv, version) - } - - val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}")) - - - // assert(kv.qualifier.nonEmpty && index.metaSeqs.size == idxPropsRaw.size) - - val idxProps = for { - (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw) - } yield { - if (k == LabelMeta.degreeSeq) k -> v - else seq -> v - } - - val idxPropsMap = idxProps.toMap - val tgtVertexId = if (tgtVertexIdInQualifier) { - idxPropsMap.get(LabelMeta.toSeq) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId) - } - } else tgtVertexIdRaw - - val _mergedProps = (idxProps ++ props).toMap - val mergedProps = - if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps - else _mergedProps + (LabelMeta.timeStampSeq -> InnerVal.withLong(kv.timestamp, version)) - - // logger.error(s"$mergedProps") - // val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong - - val ts = kv.timestamp - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, mergedProps) - - } - } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala deleted file mode 100644 index 716b6fb..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala +++ /dev/null @@ -1,53 +0,0 @@ -package com.kakao.s2graph.core.storage.serde.indexedge.wide - -import com.kakao.s2graph.core.mysqls.LabelMeta -import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} -import com.kakao.s2graph.core.types.VertexId -import com.kakao.s2graph.core.{GraphUtil, IndexEdge} -import org.apache.hadoop.hbase.util.Bytes - - -class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] { - import StorageSerializable._ - - val label = indexEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf - - val idxPropsMap = indexEdge.orders.toMap - val idxPropsBytes = propsToBytes(indexEdge.orders) - - override def toKeyValues: Seq[SKeyValue] = { - val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes - val labelWithDirBytes = indexEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false) - - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - // logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}") - val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes - val qualifier = - if (indexEdge.degreeEdge) Array.empty[Byte] - else { - if (indexEdge.op == GraphUtil.operations("incrementCount")) { - Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op)) - } else { - idxPropsMap.get(LabelMeta.toSeq) match { - case None => Bytes.add(idxPropsBytes, tgtIdBytes) - case Some(vId) => idxPropsBytes - } - } - } - - - val value = - if (indexEdge.degreeEdge) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong) - else if (indexEdge.op == GraphUtil.operations("incrementCount")) - Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong) - else propsToKeyValues(indexEdge.metas.toSeq) - - val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version) - - Seq(kv) - } - } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala deleted file mode 100644 index c97bed6..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala +++ /dev/null @@ -1,84 +0,0 @@ -package com.kakao.s2graph.core.storage.serde.snapshotedge.tall - -import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta} -import com.kakao.s2graph.core.storage.StorageDeserializable._ -import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue} -import com.kakao.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId} -import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} -import org.apache.hadoop.hbase.util.Bytes - -class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { - - def statusCodeWithOp(byte: Byte): (Byte, Byte) = { - val statusCode = byte >> 4 - val op = byte & ((1 << 4) - 1) - (statusCode.toByte, op.toByte) - } - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - assert(kvs.size == 1) - - val kv = kvs.head - val schemaVer = queryParam.label.schemaVersion - val cellVersion = kv.timestamp - /** rowKey */ - def parseRowV3(kv: SKeyValue, version: String) = { - var pos = 0 - val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version) - pos += srcIdAndTgtIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - - val rowLen = srcIdAndTgtIdLen + 4 + 1 - (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, labelIdxSeq, isInverted, rowLen) - - } - val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map { e => - (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) - }.getOrElse(parseRowV3(kv, schemaVer)) - - val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId) - val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId) - - val (props, op, ts, statusCode, _pendingEdgeOpt) = { - var pos = 0 - val (statusCode, op) = statusCodeWithOp(kv.value(pos)) - pos += 1 - val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - val kvsMap = props.toMap - val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong - - pos = endAt - val _pendingEdgeOpt = - if (pos == kv.value.length) None - else { - val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) - pos += 1 - // val versionNum = Bytes.toLong(kv.value, pos, 8) - // pos += 8 - val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - pos = endAt - val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) - - val pendingEdge = - Edge(Vertex(srcVertexId, cellVersion), - Vertex(tgtVertexId, cellVersion), - labelWithDir, pendingEdgeOp, - cellVersion, pendingEdgeProps.toMap, - statusCode = pendingEdgeStatusCode, lockTs = lockTs) - Option(pendingEdge) - } - - (kvsMap, op, ts, statusCode, _pendingEdgeOpt) - } - - SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), - labelWithDir, op, cellVersion, props, statusCode = statusCode, - pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala deleted file mode 100644 index a507b90..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala +++ /dev/null @@ -1,47 +0,0 @@ -package com.kakao.s2graph.core.storage.serde.snapshotedge.tall - -import com.kakao.s2graph.core.SnapshotEdge -import com.kakao.s2graph.core.mysqls.LabelIndex -import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} -import com.kakao.s2graph.core.types.SourceAndTargetVertexIdPair -import org.apache.hadoop.hbase.util.Bytes - - -class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { - import StorageSerializable._ - - val label = snapshotEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf - - def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { - val byte = (((statusCode << 4) | op).toByte) - Array.fill(1)(byte.toByte) - } - def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), - propsToKeyValuesWithTs(snapshotEdge.props.toList)) - - override def toKeyValues: Seq[SKeyValue] = { - val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes - val labelWithDirBytes = snapshotEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) - - val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - - val qualifier = Array.empty[Byte] - - val value = snapshotEdge.pendingEdgeOpt match { - case None => valueBytes() - case Some(pendingEdge) => - val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) - val versionBytes = Array.empty[Byte] - val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) - val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) - - Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) - } - - val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) - Seq(kv) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala deleted file mode 100644 index 1174f50..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala +++ /dev/null @@ -1,70 +0,0 @@ -package com.kakao.s2graph.core.storage.serde.snapshotedge.wide - -import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta} -import com.kakao.s2graph.core.storage.StorageDeserializable._ -import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable} -import com.kakao.s2graph.core.types.TargetVertexId -import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex} -import org.apache.hadoop.hbase.util.Bytes - -class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { - - def statusCodeWithOp(byte: Byte): (Byte, Byte) = { - val statusCode = byte >> 4 - val op = byte & ((1 << 4) - 1) - (statusCode.toByte, op.toByte) - } - - override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = { - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - assert(kvs.size == 1) - - val kv = kvs.head - val schemaVer = queryParam.label.schemaVersion - val cellVersion = kv.timestamp - - val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0) - }.getOrElse(parseRow(kv, schemaVer)) - - val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = { - val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer) - var pos = 0 - val (statusCode, op) = statusCodeWithOp(kv.value(pos)) - pos += 1 - val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - val kvsMap = props.toMap - val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong - - pos = endAt - val _pendingEdgeOpt = - if (pos == kv.value.length) None - else { - val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos)) - pos += 1 - // val versionNum = Bytes.toLong(kv.value, pos, 8) - // pos += 8 - val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer) - pos = endAt - val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) - - val pendingEdge = - Edge(Vertex(srcVertexId, cellVersion), - Vertex(tgtVertexId, cellVersion), - labelWithDir, pendingEdgeOp, - cellVersion, pendingEdgeProps.toMap, - statusCode = pendingEdgeStatusCode, lockTs = lockTs) - Option(pendingEdge) - } - - (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt) - } - - SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), - labelWithDir, op, cellVersion, props, statusCode = statusCode, - pendingEdgeOpt = _pendingEdgeOpt, lockTs = None) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala deleted file mode 100644 index e6074d9..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala +++ /dev/null @@ -1,50 +0,0 @@ -package com.kakao.s2graph.core.storage.serde.snapshotedge.wide - -import com.kakao.s2graph.core.SnapshotEdge -import com.kakao.s2graph.core.mysqls.LabelIndex -import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable} -import com.kakao.s2graph.core.types.VertexId -import org.apache.hadoop.hbase.util.Bytes - - -/** - * this class serialize - * @param snapshotEdge - */ -class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] { - import StorageSerializable._ - - val label = snapshotEdge.label - val table = label.hbaseTableName.getBytes() - val cf = Serializable.edgeCf - - def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = { - val byte = (((statusCode << 4) | op).toByte) - Array.fill(1)(byte.toByte) - } - def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), - propsToKeyValuesWithTs(snapshotEdge.props.toList)) - - override def toKeyValues: Seq[SKeyValue] = { - val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes - val labelWithDirBytes = snapshotEdge.labelWithDir.bytes - val labelIndexSeqWithIsInvertedBytes = labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true) - - val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes) - val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes - - val qualifier = tgtIdBytes - - val value = snapshotEdge.pendingEdgeOpt match { - case None => valueBytes() - case Some(pendingEdge) => - val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op) - val versionBytes = Array.empty[Byte] - val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq) - val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) - Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) - } - val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version) - Seq(kv) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala deleted file mode 100644 index e355401..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala +++ /dev/null @@ -1,46 +0,0 @@ -package com.kakao.s2graph.core.storage.serde.vertex - -import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable} -import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId} -import com.kakao.s2graph.core.{QueryParam, Vertex} -import org.apache.hadoop.hbase.util.Bytes -import scala.collection.mutable.ListBuffer - -class VertexDeserializable extends Deserializable[Vertex] { - def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, - _kvs: Seq[T], - version: String, - cacheElementOpt: Option[Vertex]): Vertex = { - - val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } - - val kv = kvs.head - val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version) - - var maxTs = Long.MinValue - val propsMap = new collection.mutable.HashMap[Int, InnerValLike] - val belongLabelIds = new ListBuffer[Int] - - for { - kv <- kvs - } { - val propKey = - if (kv.qualifier.length == 1) kv.qualifier.head.toInt - else Bytes.toInt(kv.qualifier) - - val ts = kv.timestamp - if (ts > maxTs) maxTs = ts - - if (Vertex.isLabelId(propKey)) { - belongLabelIds += Vertex.toLabelId(propKey) - } else { - val v = kv.value - val (value, _) = InnerVal.fromBytes(v, 0, v.length, version) - propsMap += (propKey -> value) - } - } - assert(maxTs != Long.MinValue) - Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala deleted file mode 100644 index 0c17592..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala +++ /dev/null @@ -1,20 +0,0 @@ -package com.kakao.s2graph.core.storage.serde.vertex - -import com.kakao.s2graph.core.Vertex -import com.kakao.s2graph.core.storage.{SKeyValue, Serializable} -import org.apache.hadoop.hbase.util.Bytes - - -case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] { - - val cf = Serializable.vertexCf - - override def toKeyValues: Seq[SKeyValue] = { - val row = vertex.id.bytes - val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes - val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] } - (base ++ belongsTo).map { case (qualifier, value) => - SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts) - } toSeq - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala b/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala deleted file mode 100644 index 4b3b3db..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala +++ /dev/null @@ -1,163 +0,0 @@ -package com.kakao.s2graph.core.types - -import org.apache.hadoop.hbase.util.Bytes - -/** - * Created by shon on 6/6/15. - */ -object HBaseType { - val VERSION4 = "v4" - val VERSION3 = "v3" - val VERSION2 = "v2" - val VERSION1 = "v1" -// val DEFAULT_VERSION = VERSION2 - val DEFAULT_VERSION = VERSION3 - val EMPTY_SEQ_BYTE = Byte.MaxValue - val DEFAULT_COL_ID = 0 - val bitsForDir = 2 - val maxBytes = Bytes.toBytes(Int.MaxValue) - val toSeqByte = -5.toByte - val defaultTgtVertexId = null -} - -object HBaseDeserializable { - - import HBaseType._ - - // 6 bits is used for index sequence so total index per label is limited to 2^6 - def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): (Byte, Boolean) = { - val byte = bytes(offset) - val isInverted = if ((byte & 1) != 0) true else false - val labelOrderSeq = byte >> 1 - (labelOrderSeq.toByte, isInverted) - } - - def bytesToKeyValues(bytes: Array[Byte], - offset: Int, - length: Int, - version: String = DEFAULT_VERSION): (Array[(Byte, InnerValLike)], Int) = { - var pos = offset - val len = bytes(pos) - pos += 1 - val kvs = new Array[(Byte, InnerValLike)](len) - var i = 0 - while (i < len) { - val k = bytes(pos) - pos += 1 - val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version) - pos += numOfBytesUsed - kvs(i) = (k -> v) - i += 1 - } - val ret = (kvs, pos) - // logger.debug(s"bytesToProps: $ret") - ret - } - - def bytesToKeyValuesWithTs(bytes: Array[Byte], - offset: Int, - version: String = DEFAULT_VERSION): (Array[(Byte, InnerValLikeWithTs)], Int) = { - var pos = offset - val len = bytes(pos) - pos += 1 - val kvs = new Array[(Byte, InnerValLikeWithTs)](len) - var i = 0 - while (i < len) { - val k = bytes(pos) - pos += 1 - val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, version) - pos += numOfBytesUsed - kvs(i) = (k -> v) - i += 1 - } - val ret = (kvs, pos) - // logger.debug(s"bytesToProps: $ret") - ret - } - - def bytesToProps(bytes: Array[Byte], - offset: Int, - version: String = DEFAULT_VERSION): (Array[(Byte, InnerValLike)], Int) = { - var pos = offset - val len = bytes(pos) - pos += 1 - val kvs = new Array[(Byte, InnerValLike)](len) - var i = 0 - while (i < len) { - val k = EMPTY_SEQ_BYTE - val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version) - pos += numOfBytesUsed - kvs(i) = (k -> v) - i += 1 - } - // logger.error(s"bytesToProps: $kvs") - val ret = (kvs, pos) - - ret - } -} - -object HBaseSerializable { - def propsToBytes(props: Seq[(Byte, InnerValLike)]): Array[Byte] = { - val len = props.length - assert(len < Byte.MaxValue) - var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, v.bytes) - bytes - } - - def propsToKeyValues(props: Seq[(Byte, InnerValLike)]): Array[Byte] = { - val len = props.length - assert(len < Byte.MaxValue) - var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes) - bytes - } - - def propsToKeyValuesWithTs(props: Seq[(Byte, InnerValLikeWithTs)]): Array[Byte] = { - val len = props.length - assert(len < Byte.MaxValue) - var bytes = Array.fill(1)(len.toByte) - for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes) - bytes - } - - def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): Array[Byte] = { - assert(labelOrderSeq < (1 << 6)) - val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0) - Array.fill(1)(byte.toByte) - } -} - -trait HBaseSerializable { - def bytes: Array[Byte] -} - -trait HBaseDeserializable { - - import HBaseType._ - - def fromBytes(bytes: Array[Byte], - offset: Int, - len: Int, - version: String = DEFAULT_VERSION): (HBaseSerializable, Int) - - // def fromBytesWithIndex(bytes: Array[Byte], - // offset: Int, - // len: Int, - // version: String = DEFAULT_VERSION): (HBaseSerializable, Int) - def notSupportedEx(version: String) = new RuntimeException(s"not supported version, $version") -} - -trait HBaseDeserializableWithIsVertexId { - - import HBaseType._ - - def fromBytes(bytes: Array[Byte], - offset: Int, - len: Int, - version: String = DEFAULT_VERSION, - isVertexId: Boolean = false): (HBaseSerializable, Int) - - def notSupportedEx(version: String) = new RuntimeException(s"not supported version, $version") -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala b/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala deleted file mode 100644 index 8146f32..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala +++ /dev/null @@ -1,246 +0,0 @@ -package com.kakao.s2graph.core.types - -import org.apache.hadoop.hbase.util._ - -/** - * Created by shon on 6/6/15. - */ -object InnerVal extends HBaseDeserializableWithIsVertexId { - import HBaseType._ - - val order = Order.DESCENDING - val stringLenOffset = 7.toByte - val maxStringLen = Byte.MaxValue - stringLenOffset - val maxMetaByte = Byte.MaxValue - val minMetaByte = 0.toByte - - /** supported data type */ - val BLOB = "blob" - val STRING = "string" - val DOUBLE = "double" - val FLOAT = "float" - val LONG = "long" - val INT = "integer" - val SHORT = "short" - val BYTE = "byte" - val NUMERICS = List(DOUBLE, FLOAT, LONG, INT, SHORT, BYTE) - val BOOLEAN = "boolean" - - def isNumericType(dataType: String): Boolean = { - dataType match { - case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => true - case _ => false - } - } - def toInnerDataType(dataType: String): String = { - dataType match { - case "blob" => BLOB - case "string" | "str" | "s" => STRING - case "double" | "d" | "float64" => DOUBLE - case "float" | "f" | "float32" => FLOAT - case "long" | "l" | "int64" | "integer64" => LONG - case "int" | "integer" | "i" | "int32" | "integer32" => INT - case "short" | "int16" | "integer16" => SHORT - case "byte" | "b" | "tinyint" | "int8" | "integer8" => BYTE - case "boolean" | "bool" => BOOLEAN - case _ => throw new RuntimeException(s"can`t convert $dataType into InnerDataType") - } - } - - def numByteRange(num: BigDecimal) = { -// val byteLen = -// if (num.isValidByte | num.isValidChar) 1 -// else if (num.isValidShort) 2 -// else if (num.isValidInt) 4 -// else if (num.isValidLong) 8 -// else if (num.isValidFloat) 4 -// else 12 - val byteLen = 12 - // else throw new RuntimeException(s"wrong data $num") - new SimplePositionedMutableByteRange(byteLen + 4) - } - - def fromBytes(bytes: Array[Byte], - offset: Int, - len: Int, - version: String, - isVertexId: Boolean): (InnerValLike, Int) = { - version match { - case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal.fromBytes(bytes, offset, len, version, isVertexId) - case VERSION1 => v1.InnerVal.fromBytes(bytes, offset, len, version, isVertexId) - case _ => throw notSupportedEx(version) - } - } - - def withLong(l: Long, version: String): InnerValLike = { - version match { - case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(l)) - case VERSION1 => v1.InnerVal(Some(l), None, None) - case _ => throw notSupportedEx(version) - } - } - - def withInt(i: Int, version: String): InnerValLike = { - version match { - case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(i)) - case VERSION1 => v1.InnerVal(Some(i.toLong), None, None) - case _ => throw notSupportedEx(version) - } - } - - def withFloat(f: Float, version: String): InnerValLike = { - version match { - case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(f.toDouble)) - case VERSION1 => v1.InnerVal(Some(f.toLong), None, None) - case _ => throw notSupportedEx(version) - } - } - - def withDouble(d: Double, version: String): InnerValLike = { - version match { - case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(d)) - case VERSION1 => v1.InnerVal(Some(d.toLong), None, None) - case _ => throw notSupportedEx(version) - } - } - - def withNumber(num: BigDecimal, version: String): InnerValLike = { - version match { - case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(num) - case VERSION1 => v1.InnerVal(Some(num.toLong), None, None) - case _ => throw notSupportedEx(version) - } - } - - def withBoolean(b: Boolean, version: String): InnerValLike = { - version match { - case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(b) - case VERSION1 => v1.InnerVal(None, None, Some(b)) - case _ => throw notSupportedEx(version) - } - } - - def withBlob(blob: Array[Byte], version: String): InnerValLike = { - version match { - case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(blob) - case _ => throw notSupportedEx(version) - } - } - - def withStr(s: String, version: String): InnerValLike = { - version match { - case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(s) - case VERSION1 => v1.InnerVal(None, Some(s), None) - case _ => throw notSupportedEx(version) - } - } - -// def withInnerVal(innerVal: InnerValLike, version: String): InnerValLike = { -// val bytes = innerVal.bytes -// version match { -// case VERSION2 => v2.InnerVal.fromBytes(bytes, 0, bytes.length, version)._1 -// case VERSION1 => v1.InnerVal.fromBytes(bytes, 0, bytes.length, version)._1 -// case _ => throw notSupportedEx(version) -// } -// } - - /** nasty implementation for backward compatability */ - def convertVersion(innerVal: InnerValLike, dataType: String, toVersion: String): InnerValLike = { - val ret = toVersion match { - case VERSION2 | VERSION3 | VERSION4 => - if (innerVal.isInstanceOf[v1.InnerVal]) { - val obj = innerVal.asInstanceOf[v1.InnerVal] - obj.valueType match { - case "long" => InnerVal.withLong(obj.longV.get, toVersion) - case "string" => InnerVal.withStr(obj.strV.get, toVersion) - case "boolean" => InnerVal.withBoolean(obj.boolV.get, toVersion) - case _ => throw new Exception(s"InnerVal should be [long/integeer/short/byte/string/boolean]") - } - } else { - innerVal - } - case VERSION1 => - if (innerVal.isInstanceOf[v2.InnerVal]) { - val obj = innerVal.asInstanceOf[v2.InnerVal] - obj.value match { - case str: String => InnerVal.withStr(str, toVersion) - case b: Boolean => InnerVal.withBoolean(b, toVersion) - case n: BigDecimal => InnerVal.withNumber(n, toVersion) - case n: Long => InnerVal.withNumber(n, toVersion) - case n: Double => InnerVal.withNumber(n, toVersion) - case n: Int => InnerVal.withNumber(n, toVersion) - case _ => throw notSupportedEx(s"v2 to v1: $obj -> $toVersion") - } - } else { - innerVal - } - case _ => throw notSupportedEx(toVersion) - } -// logger.debug(s"convertVersion: $innerVal, $dataType, $toVersion, $ret, ${innerVal.bytes.toList}, ${ret.bytes.toList}") - ret - } - -} - -trait InnerValLike extends HBaseSerializable { - - val value: Any - - def compare(other: InnerValLike): Int - - def +(other: InnerValLike): InnerValLike - - def <(other: InnerValLike) = this.compare(other) < 0 - - def <=(other: InnerValLike) = this.compare(other) <= 0 - - def >(other: InnerValLike) = this.compare(other) > 0 - - def >=(other: InnerValLike) = this.compare(other) >= 0 - - override def toString(): String = value.toString - - override def hashCode(): Int = value.hashCode() - - override def equals(obj: Any): Boolean = { - obj match { - case other: InnerValLike => - val ret = toString == obj.toString -// logger.debug(s"InnerValLike.equals($this, $obj) => $ret") - ret - case _ => false - } - } - def hashKey(dataType: String): Int - - def toIdString(): String - -} - -object InnerValLikeWithTs extends HBaseDeserializable { - import HBaseType._ - def fromBytes(bytes: Array[Byte], - offset: Int, - len: Int, - version: String = DEFAULT_VERSION): (InnerValLikeWithTs, Int) = { - val (innerVal, numOfBytesUsed) = InnerVal.fromBytes(bytes, offset, len, version) - val ts = Bytes.toLong(bytes, offset + numOfBytesUsed) - (InnerValLikeWithTs(innerVal, ts), numOfBytesUsed + 8) - } - - def withLong(l: Long, ts: Long, version: String): InnerValLikeWithTs = { - InnerValLikeWithTs(InnerVal.withLong(l, version), ts) - } - - def withStr(s: String, ts: Long, version: String): InnerValLikeWithTs = { - InnerValLikeWithTs(InnerVal.withStr(s, version), ts) - } -} - -case class InnerValLikeWithTs(innerVal: InnerValLike, ts: Long) - extends HBaseSerializable { - - def bytes: Array[Byte] = { - Bytes.add(innerVal.bytes, Bytes.toBytes(ts)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/LabelWithDirection.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/LabelWithDirection.scala b/s2core/src/main/scala/com/kakao/s2graph/core/types/LabelWithDirection.scala deleted file mode 100644 index 6207ad7..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/types/LabelWithDirection.scala +++ /dev/null @@ -1,65 +0,0 @@ -package com.kakao.s2graph.core.types - -import com.kakao.s2graph.core.GraphUtil -import com.kakao.s2graph.core.utils.logger -import org.apache.hadoop.hbase.util.Bytes - -/** - * Created by shon on 6/6/15. - */ -object LabelWithDirection { - - import HBaseType._ - - def apply(compositeInt: Int): LabelWithDirection = { - // logger.debug(s"CompositeInt: $compositeInt") - - val dir = compositeInt & ((1 << bitsForDir) - 1) - val labelId = compositeInt >> bitsForDir - LabelWithDirection(labelId, dir) - } - - def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): Array[Byte] = { - assert(labelOrderSeq < (1 << 6)) - val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0) - Array.fill(1)(byte.toByte) - } - - def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): (Byte, Boolean) = { - val byte = bytes(offset) - val isInverted = if ((byte & 1) != 0) true else false - val labelOrderSeq = byte >> 1 - (labelOrderSeq.toByte, isInverted) - } -} - -case class LabelWithDirection(labelId: Int, dir: Int) extends HBaseSerializable { - - import HBaseType._ - - assert(dir < (1 << bitsForDir)) - assert(labelId < (Int.MaxValue >> bitsForDir)) - - lazy val labelBits = labelId << bitsForDir - - lazy val compositeInt = labelBits | dir - - def bytes = { - Bytes.toBytes(compositeInt) - } - - lazy val dirToggled = LabelWithDirection(labelId, GraphUtil.toggleDir(dir)) - - def updateDir(newDir: Int) = LabelWithDirection(labelId, newDir) - - def isDirected = dir == 0 || dir == 1 - - override def hashCode(): Int = compositeInt - - override def equals(other: Any): Boolean = { - other match { - case o: LabelWithDirection => hashCode == o.hashCode() - case _ => false - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/VertexId.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/VertexId.scala b/s2core/src/main/scala/com/kakao/s2graph/core/types/VertexId.scala deleted file mode 100644 index 13637bf..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/types/VertexId.scala +++ /dev/null @@ -1,145 +0,0 @@ -package com.kakao.s2graph.core.types - -import com.kakao.s2graph.core.GraphUtil -import com.kakao.s2graph.core.types.HBaseType._ -import org.apache.hadoop.hbase.util.Bytes - -/** - * Created by shon on 6/10/15. - */ -object VertexId extends HBaseDeserializable { - import HBaseType._ - def fromBytes(bytes: Array[Byte], - offset: Int, - len: Int, - version: String = DEFAULT_VERSION): (VertexId, Int) = { - /** since murmur hash is prepended, skip numOfBytes for murmur hash */ - var pos = offset + GraphUtil.bytesForMurMurHash - - val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true) - pos += numOfBytesUsed - val colId = Bytes.toInt(bytes, pos, 4) - (VertexId(colId, innerId), GraphUtil.bytesForMurMurHash + numOfBytesUsed + 4) - } - - def apply(colId: Int, innerId: InnerValLike): VertexId = new VertexId(colId, innerId) - - def toSourceVertexId(vid: VertexId) = { - SourceVertexId(vid.colId, vid.innerId) - } - - def toTargetVertexId(vid: VertexId) = { - TargetVertexId(vid.colId, vid.innerId) - } -} - -class VertexId protected (val colId: Int, val innerId: InnerValLike) extends HBaseSerializable { - val storeHash: Boolean = true - val storeColId: Boolean = true - lazy val hashBytes = -// if (storeHash) Bytes.toBytes(GraphUtil.murmur3(innerId.toString)) - if (storeHash) Bytes.toBytes(GraphUtil.murmur3(innerId.toIdString())) - else Array.empty[Byte] - - lazy val colIdBytes: Array[Byte] = - if (storeColId) Bytes.toBytes(colId) - else Array.empty[Byte] - - def bytes: Array[Byte] = Bytes.add(hashBytes, innerId.bytes, colIdBytes) - - override def toString(): String = { - colId.toString() + "," + innerId.toString() -// s"VertexId($colId, $innerId)" - } - - override def hashCode(): Int = { - val ret = if (storeColId) { - colId * 31 + innerId.hashCode() - } else { - innerId.hashCode() - } -// logger.debug(s"VertexId.hashCode: $ret") - ret - } - override def equals(obj: Any): Boolean = { - val ret = obj match { - case other: VertexId => colId == other.colId && innerId.toIdString() == other.innerId.toIdString() - case _ => false - } -// logger.debug(s"VertexId.equals: $this, $obj => $ret") - ret - } - - def compareTo(other: VertexId): Int = { - Bytes.compareTo(bytes, other.bytes) - } - def <(other: VertexId): Boolean = compareTo(other) < 0 - def <=(other: VertexId): Boolean = compareTo(other) <= 0 - def >(other: VertexId): Boolean = compareTo(other) > 0 - def >=(other: VertexId): Boolean = compareTo(other) >= 0 -} - -object SourceVertexId extends HBaseDeserializable { - import HBaseType._ - def fromBytes(bytes: Array[Byte], - offset: Int, - len: Int, - version: String = DEFAULT_VERSION): (VertexId, Int) = { - /** since murmur hash is prepended, skip numOfBytes for murmur hash */ - val pos = offset + GraphUtil.bytesForMurMurHash - val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true) - - (SourceVertexId(DEFAULT_COL_ID, innerId), GraphUtil.bytesForMurMurHash + numOfBytesUsed) - } - -} - - -case class SourceVertexId(override val colId: Int, - override val innerId: InnerValLike) extends VertexId(colId, innerId) { - override val storeColId: Boolean = false -} - -object TargetVertexId extends HBaseDeserializable { - import HBaseType._ - def fromBytes(bytes: Array[Byte], - offset: Int, - len: Int, - version: String = DEFAULT_VERSION): (VertexId, Int) = { - /** murmur has is not prepended so start from offset */ - val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, offset, len, version, isVertexId = true) - (TargetVertexId(DEFAULT_COL_ID, innerId), numOfBytesUsed) - } -} - -case class TargetVertexId(override val colId: Int, - override val innerId: InnerValLike) - extends VertexId(colId, innerId) { - override val storeColId: Boolean = false - override val storeHash: Boolean = false - -} - -object SourceAndTargetVertexIdPair extends HBaseDeserializable { - val delimiter = ":" - import HBaseType._ - def fromBytes(bytes: Array[Byte], - offset: Int, - len: Int, - version: String = DEFAULT_VERSION): (SourceAndTargetVertexIdPair, Int) = { - val pos = offset + GraphUtil.bytesForMurMurHash - val (srcId, srcBytesLen) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true) - val (tgtId, tgtBytesLen) = InnerVal.fromBytes(bytes, pos + srcBytesLen, len, version, isVertexId = true) - (SourceAndTargetVertexIdPair(srcId, tgtId), GraphUtil.bytesForMurMurHash + srcBytesLen + tgtBytesLen) - } -} - -case class SourceAndTargetVertexIdPair(val srcInnerId: InnerValLike, val tgtInnerId: InnerValLike) extends HBaseSerializable { - val colId = DEFAULT_COL_ID - import SourceAndTargetVertexIdPair._ - override def bytes: Array[Byte] = { - val hashBytes = Bytes.toBytes(GraphUtil.murmur3(srcInnerId + delimiter + tgtInnerId)) - Bytes.add(hashBytes, srcInnerId.bytes, tgtInnerId.bytes) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/v1/InnerVal.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/v1/InnerVal.scala b/s2core/src/main/scala/com/kakao/s2graph/core/types/v1/InnerVal.scala deleted file mode 100644 index 5111cbc..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/types/v1/InnerVal.scala +++ /dev/null @@ -1,226 +0,0 @@ -package com.kakao.s2graph.core.types.v1 - -import com.kakao.s2graph.core.GraphExceptions.IllegalDataTypeException -import com.kakao.s2graph.core.types._ -import org.apache.hadoop.hbase.util.Bytes - -/** - * Created by shon on 6/6/15. - */ -object InnerVal extends HBaseDeserializableWithIsVertexId { - import HBaseType._ - // val defaultVal = new InnerVal(None, None, None) - val stringLenOffset = 7.toByte - val maxStringLen = Byte.MaxValue - stringLenOffset - val maxMetaByte = Byte.MaxValue - val minMetaByte = 0.toByte - /** - * first byte encoding rule. - * 0 => default - * 1 => long - * 2 => int - * 3 => short - * 4 => byte - * 5 => true - * 6 => false - * 7 ~ 127 => string len + 7 - */ - val metaByte = Map("default" -> 0, "long" -> 1, "int" -> 2, "short" -> 3, - "byte" -> 4, "true" -> 5, "false" -> 6).map { - case (k, v) => (k, v.toByte) - } - val metaByteRev = metaByte.map { case (k, v) => (v.toByte, k) } ++ metaByte.map { case (k, v) => ((-v).toByte, k) } - - def maxIdVal(dataType: String) = { - dataType match { - case "string" => InnerVal.withStr((0 until (Byte.MaxValue - stringLenOffset)).map("~").mkString) - case "long" => InnerVal.withLong(Long.MaxValue) - case "bool" => InnerVal.withBoolean(true) - case _ => throw IllegalDataTypeException(dataType) - } - } - - def minIdVal(dataType: String) = { - dataType match { - case "string" => InnerVal.withStr("") - case "long" => InnerVal.withLong(1) - case "bool" => InnerVal.withBoolean(false) - case _ => throw IllegalDataTypeException(dataType) - } - } - - def fromBytes(bytes: Array[Byte], offset: Int, len: Int, version: String = DEFAULT_VERSION, isVertexId: Boolean = false): (InnerVal, Int) = { - var pos = offset - // - val header = bytes(pos) - // logger.debug(s"${bytes(offset)}: ${bytes.toList.slice(pos, bytes.length)}") - pos += 1 - - var numOfBytesUsed = 0 - val (longV, strV, boolV) = metaByteRev.get(header) match { - case Some(s) => - s match { - case "default" => - (None, None, None) - case "true" => - numOfBytesUsed = 0 - (None, None, Some(true)) - case "false" => - numOfBytesUsed = 0 - (None, None, Some(false)) - case "byte" => - numOfBytesUsed = 1 - val b = bytes(pos) - val value = if (b >= 0) Byte.MaxValue - b else Byte.MinValue - b - 1 - (Some(value.toLong), None, None) - case "short" => - numOfBytesUsed = 2 - val b = Bytes.toShort(bytes, pos, 2) - val value = if (b >= 0) Short.MaxValue - b else Short.MinValue - b - 1 - (Some(value.toLong), None, None) - case "int" => - numOfBytesUsed = 4 - val b = Bytes.toInt(bytes, pos, 4) - val value = if (b >= 0) Int.MaxValue - b else Int.MinValue - b - 1 - (Some(value.toLong), None, None) - case "long" => - numOfBytesUsed = 8 - val b = Bytes.toLong(bytes, pos, 8) - val value = if (b >= 0) Long.MaxValue - b else Long.MinValue - b - 1 - (Some(value.toLong), None, None) - } - case _ => // string - val strLen = header - stringLenOffset - numOfBytesUsed = strLen - (None, Some(Bytes.toString(bytes, pos, strLen)), None) - } - - (InnerVal(longV, strV, boolV), numOfBytesUsed + 1) - } - - def withLong(l: Long): InnerVal = { - // if (l < 0) throw new IllegalDataRangeException("value shoudl be >= 0") - InnerVal(Some(l), None, None) - } - - def withStr(s: String): InnerVal = { - InnerVal(None, Some(s), None) - } - - def withBoolean(b: Boolean): InnerVal = { - InnerVal(None, None, Some(b)) - } - - /** - * In natural order - * -129, -128 , -2, -1 < 0 < 1, 2, 127, 128 - * - * In byte order - * 0 < 1, 2, 127, 128 < -129, -128, -2, -1 - * - */ - def transform(l: Long): (Byte, Array[Byte]) = { - if (Byte.MinValue <= l && l <= Byte.MaxValue) { - // val value = if (l < 0) l - Byte.MinValue else l + Byte.MinValue - val key = if (l >= 0) metaByte("byte") else -metaByte("byte") - val value = if (l >= 0) Byte.MaxValue - l else Byte.MinValue - l - 1 - val valueBytes = Array.fill(1)(value.toByte) - (key.toByte, valueBytes) - } else if (Short.MinValue <= l && l <= Short.MaxValue) { - val key = if (l >= 0) metaByte("short") else -metaByte("short") - val value = if (l >= 0) Short.MaxValue - l else Short.MinValue - l - 1 - val valueBytes = Bytes.toBytes(value.toShort) - (key.toByte, valueBytes) - } else if (Int.MinValue <= l && l <= Int.MaxValue) { - val key = if (l >= 0) metaByte("int") else -metaByte("int") - val value = if (l >= 0) Int.MaxValue - l else Int.MinValue - l - 1 - val valueBytes = Bytes.toBytes(value.toInt) - (key.toByte, valueBytes) - } else if (Long.MinValue <= l && l <= Long.MaxValue) { - val key = if (l >= 0) metaByte("long") else -metaByte("long") - val value = if (l >= 0) Long.MaxValue - l else Long.MinValue - l - 1 - val valueBytes = Bytes.toBytes(value.toLong) - (key.toByte, valueBytes) - } else { - throw new Exception(s"InnerVal range is out: $l") - } - } -} - -case class InnerVal(longV: Option[Long], strV: Option[String], boolV: Option[Boolean]) - extends HBaseSerializable with InnerValLike { - - import InnerVal._ - - lazy val isDefault = longV.isEmpty && strV.isEmpty && boolV.isEmpty - val value = (longV, strV, boolV) match { - case (Some(l), None, None) => l - case (None, Some(s), None) => s - case (None, None, Some(b)) => b - case _ => throw new Exception(s"InnerVal should be [long/integeer/short/byte/string/boolean]") - } - def valueType = (longV, strV, boolV) match { - case (Some(l), None, None) => "long" - case (None, Some(s), None) => "string" - case (None, None, Some(b)) => "boolean" - case _ => throw new Exception(s"InnerVal should be [long/integeer/short/byte/string/boolean]") - } - - def compare(other: InnerValLike): Int = { - if (!other.isInstanceOf[InnerVal]) { - throw new RuntimeException(s"compare between $this vs $other is not supported") - } else { -// (value, other.value) match { -// case (v1: Long, v2: Long) => v1.compare(v2) -// case (b1: Boolean, b2: Boolean) => b1.compare(b2) -// case (s1: String, s2: String) => s1.compare(s2) -// case _ => throw new Exception("Please check a type of the compare operands") -// } - Bytes.compareTo(bytes, other.bytes) * -1 - } - } - - def +(other: InnerValLike) = { - if (!other.isInstanceOf[InnerVal]) { - throw new RuntimeException(s"+ between $this vs $other is not supported") - } else { - (value, other.value) match { - case (v1: Long, v2: Long) => InnerVal.withLong(v1 + v2) - case (b1: Boolean, b2: Boolean) => InnerVal.withBoolean(if (b2) !b1 else b1) - case _ => throw new Exception("Please check a type of the incr operands") - } - } - } - - def bytes = { - val (meta, valBytes) = (longV, strV, boolV) match { - case (None, None, None) => - (metaByte("default"), Array.empty[Byte]) - case (Some(l), None, None) => - transform(l) - case (None, None, Some(b)) => - val meta = if (b) metaByte("true") else metaByte("false") - (meta, Array.empty[Byte]) - case (None, Some(s), None) => - val sBytes = Bytes.toBytes(s) - if (sBytes.length > maxStringLen) { - throw new IllegalDataTypeException(s"string in innerVal maxSize is $maxStringLen, given ${sBytes.length}") - } - assert(sBytes.length <= maxStringLen) - val meta = (stringLenOffset + sBytes.length).toByte - (meta, sBytes) - case _ => throw new IllegalDataTypeException("innerVal data type should be [long/string/bool]") - } - Bytes.add(Array.fill(1)(meta.toByte), valBytes) - } - - override def toString(): String = { - value.toString - } - override def hashKey(dataType: String): Int = { - value.toString.hashCode() - } - override def toIdString(): String = { - value.toString - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/v2/InnerVal.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/v2/InnerVal.scala b/s2core/src/main/scala/com/kakao/s2graph/core/types/v2/InnerVal.scala deleted file mode 100644 index 630e53d..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/types/v2/InnerVal.scala +++ /dev/null @@ -1,162 +0,0 @@ -package com.kakao.s2graph.core.types.v2 - -import com.kakao.s2graph.core.types._ -import org.apache.hadoop.hbase.util._ - -/** - * Created by shon on 6/6/15. - */ -object InnerVal extends HBaseDeserializableWithIsVertexId { - - import HBaseType._ - - val order = Order.DESCENDING - - def fromBytes(bytes: Array[Byte], - offset: Int, - len: Int, - version: String = DEFAULT_VERSION, - isVertexId: Boolean = false): (InnerVal, Int) = { - val pbr = new SimplePositionedByteRange(bytes) - pbr.setPosition(offset) - val startPos = pbr.getPosition - if (bytes(offset) == -1 | bytes(offset) == 0) { - /** simple boolean */ - val boolean = order match { - case Order.DESCENDING => bytes(offset) == 0 - case _ => bytes(offset) == -1 - } - (InnerVal(boolean), 1) - } - else { - if (OrderedBytes.isNumeric(pbr)) { - val numeric = OrderedBytes.decodeNumericAsBigDecimal(pbr) - if (isVertexId) (InnerVal(numeric.longValue()), pbr.getPosition - startPos) - else (InnerVal(BigDecimal(numeric)), pbr.getPosition - startPos) -// (InnerVal(numeric.doubleValue()), pbr.getPosition - startPos) -// (InnerVal(BigDecimal(numeric)), pbr.getPosition - startPos) - } else if (OrderedBytes.isText(pbr)) { - val str = OrderedBytes.decodeString(pbr) - (InnerVal(str), pbr.getPosition - startPos) - } else if (OrderedBytes.isBlobVar(pbr)) { - val blobVar = OrderedBytes.decodeBlobVar(pbr) - (InnerVal(blobVar), pbr.getPosition - startPos) - } else { - throw new RuntimeException("!!") - } - } - } -} - -case class InnerVal(value: Any) extends HBaseSerializable with InnerValLike { - - import com.kakao.s2graph.core.types.InnerVal._ - - def bytes: Array[Byte] = { - val ret = value match { - case b: Boolean => - - /** since OrderedBytes header start from 0x05, it is safe to use -1, 0 - * for decreasing order (true, false) */ - // Bytes.toBytes(b) - order match { - case Order.DESCENDING => if (b) Array(0.toByte) else Array(-1.toByte) - case _ => if (!b) Array(0.toByte) else Array(-1.toByte) - } - case d: Double => - val num = BigDecimal(d) - val pbr = numByteRange(num) - val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order) - pbr.getBytes().take(len) - case l: Long => - val num = BigDecimal(l) - val pbr = numByteRange(num) - val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order) - pbr.getBytes().take(len) - case i: Int => - val num = BigDecimal(i) - val pbr = numByteRange(num) - val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order) - pbr.getBytes().take(len) - case sh: Short => - val num = BigDecimal(sh) - val pbr = numByteRange(num) - val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order) - pbr.getBytes().take(len) - case b: Byte => - val num = BigDecimal(b) - val pbr = numByteRange(num) - val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order) - pbr.getBytes().take(len) - - - case b: BigDecimal => - val pbr = numByteRange(b) - val len = OrderedBytes.encodeNumeric(pbr, b.bigDecimal, order) - pbr.getBytes().take(len) - case s: String => - val pbr = new SimplePositionedMutableByteRange(s.getBytes.length + 3) - val len = OrderedBytes.encodeString(pbr, s, order) - pbr.getBytes().take(len) - case blob: Array[Byte] => - val len = OrderedBytes.blobVarEncodedLength(blob.length) - val pbr = new SimplePositionedMutableByteRange(len) - val totalLen = OrderedBytes.encodeBlobVar(pbr, blob, order) - pbr.getBytes().take(totalLen) - } - // println(s"$value => ${ret.toList}, ${ret.length}") - ret - } - -// - override def hashKey(dataType: String): Int = { - if (value.isInstanceOf[String]) { - // since we use dummy stringn value for degree edge. - value.toString.hashCode() - } else { - dataType match { - case BYTE => value.asInstanceOf[BigDecimal].bigDecimal.byteValue().hashCode() - case FLOAT => value.asInstanceOf[BigDecimal].bigDecimal.floatValue().hashCode() - case DOUBLE => value.asInstanceOf[BigDecimal].bigDecimal.doubleValue().hashCode() - case LONG => value.asInstanceOf[BigDecimal].bigDecimal.longValue().hashCode() - case INT => value.asInstanceOf[BigDecimal].bigDecimal.intValue().hashCode() - case SHORT => value.asInstanceOf[BigDecimal].bigDecimal.shortValue().hashCode() - case STRING => value.toString.hashCode - case _ => throw new RuntimeException(s"NotSupportede type: $dataType") - } - } - } - - def compare(other: InnerValLike): Int = { - if (!other.isInstanceOf[InnerValLike]) - throw new RuntimeException(s"compare $this vs $other") - Bytes.compareTo(bytes, other.bytes) * -1 - } - - def +(other: InnerValLike): InnerValLike = { - if (!other.isInstanceOf[InnerValLike]) - throw new RuntimeException(s"+ $this, $other") - - (value, other.value) match { - case (v1: BigDecimal, v2: BigDecimal) => new InnerVal(BigDecimal(v1.bigDecimal.add(v2.bigDecimal))) - case _ => throw new RuntimeException("+ operation on inner val is for big decimal pair") - } - } - - //need to be removed ?? - override def toString(): String = { -// value.toString() - value match { - case n: BigDecimal => n.bigDecimal.toPlainString - case _ => value.toString - } - } - - override def toIdString(): String = { - value match { - case n: BigDecimal => n.bigDecimal.longValue().toString() - case _ => value.toString - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala deleted file mode 100644 index 6777c28..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala +++ /dev/null @@ -1,82 +0,0 @@ -package com.kakao.s2graph.core.utils - -import java.util.concurrent.TimeUnit - -import com.google.common.cache.CacheBuilder -import com.stumbleupon.async.Deferred -import com.typesafe.config.Config - -import scala.concurrent.ExecutionContext - -class DeferCache[R](config: Config)(implicit ex: ExecutionContext) { - - import com.kakao.s2graph.core.utils.Extensions.DeferOps - - type Value = (Long, Deferred[R]) - - private val maxSize = config.getInt("future.cache.max.size") - private val expireAfterWrite = config.getInt("future.cache.expire.after.write") - private val expireAfterAccess = config.getInt("future.cache.expire.after.access") - - private val futureCache = CacheBuilder.newBuilder() - .initialCapacity(maxSize) - .concurrencyLevel(Runtime.getRuntime.availableProcessors()) - .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) - .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS) - .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[R])]() - - - def asMap() = futureCache.asMap() - - def getIfPresent(cacheKey: Long): Value = futureCache.getIfPresent(cacheKey) - - private def checkAndExpire(cacheKey: Long, - cachedAt: Long, - cacheTTL: Long, - oldDefer: Deferred[R])(op: => Deferred[R]): Deferred[R] = { - if (System.currentTimeMillis() >= cachedAt + cacheTTL) { - // future is too old. so need to expire and fetch new data from storage. - futureCache.asMap().remove(cacheKey) - - val newPromise = new Deferred[R]() - val now = System.currentTimeMillis() - - futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match { - case null => - // only one thread succeed to come here concurrently - // initiate fetch to storage then add callback on complete to finish promise. - op withCallback { value => - newPromise.callback(value) - value - } - newPromise - case (cachedAt, oldDefer) => oldDefer - } - } else { - // future is not to old so reuse it. - oldDefer - } - } - def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Deferred[R]): Deferred[R] = { - val cacheVal = futureCache.getIfPresent(cacheKey) - cacheVal match { - case null => - val promise = new Deferred[R]() - val now = System.currentTimeMillis() - val (cachedAt, defer) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { - case null => - op.withCallback { value => - promise.callback(value) - value - } - (now, promise) - case oldVal => oldVal - } - checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op) - - case (cachedAt, defer) => - checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op) - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala deleted file mode 100644 index eea9a79..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala +++ /dev/null @@ -1,72 +0,0 @@ -package com.kakao.s2graph.core.utils - -import com.stumbleupon.async.{Callback, Deferred} -import com.typesafe.config.Config -import scala.concurrent.{ExecutionContext, Future, Promise} - -object Extensions { - - - def retryOnSuccess[T](maxRetryNum: Int, n: Int = 1)(fn: => Future[T])(shouldStop: T => Boolean)(implicit ex: ExecutionContext): Future[T] = n match { - case i if n <= maxRetryNum => - fn.flatMap { result => - if (!shouldStop(result)) { - logger.info(s"retryOnSuccess $n") - retryOnSuccess(maxRetryNum, n + 1)(fn)(shouldStop) - } else { - Future.successful(result) - } - } - case _ => fn - } - - def retryOnFailure[T](maxRetryNum: Int, n: Int = 1)(fn: => Future[T])(fallback: => T)(implicit ex: ExecutionContext): Future[T] = n match { - case i if n <= maxRetryNum => - fn recoverWith { case t: Throwable => - logger.info(s"retryOnFailure $n $t") - retryOnFailure(maxRetryNum, n + 1)(fn)(fallback) - } - case _ => - Future.successful(fallback) - } - - - implicit class DeferOps[T](d: Deferred[T])(implicit ex: ExecutionContext) { - - def withCallback[R](op: T => R): Deferred[R] = { - d.addCallback(new Callback[R, T] { - override def call(arg: T): R = op(arg) - }) - } - - def recoverWith(op: Exception => T): Deferred[T] = { - d.addErrback(new Callback[Deferred[T], Exception] { - override def call(e: Exception): Deferred[T] = Deferred.fromResult(op(e)) - }) - } - - - def toFuture: Future[T] = { - val promise = Promise[T] - - d.addBoth(new Callback[Unit, T] { - def call(arg: T) = arg match { - case e: Exception => promise.failure(e) - case _ => promise.success(arg) - } - }) - - promise.future - } - - def toFutureWith(fallback: => T): Future[T] = { - toFuture recoverWith { case t: Throwable => Future.successful(fallback) } - } - - } - - implicit class ConfigOps(config: Config) { - def getBooleanWithFallback(key: String, defaultValue: Boolean): Boolean = - if (config.hasPath(key)) config.getBoolean(key) else defaultValue - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala deleted file mode 100644 index 17d9e8f..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala +++ /dev/null @@ -1,82 +0,0 @@ -package com.kakao.s2graph.core.utils - -import java.util.concurrent.TimeUnit - -import com.google.common.cache.CacheBuilder -import com.typesafe.config.Config - -import scala.concurrent.{Promise, Future, ExecutionContext} - - -class FutureCache[R](config: Config)(implicit ex: ExecutionContext) { - - type Value = (Long, Future[R]) - - private val maxSize = config.getInt("future.cache.max.size") - private val expireAfterWrite = config.getInt("future.cache.expire.after.write") - private val expireAfterAccess = config.getInt("future.cache.expire.after.access") - - private val futureCache = CacheBuilder.newBuilder() - .initialCapacity(maxSize) - .concurrencyLevel(Runtime.getRuntime.availableProcessors()) - .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) - .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS) - .maximumSize(maxSize).build[java.lang.Long, (Long, Promise[R])]() - - - def asMap() = futureCache.asMap() - - def getIfPresent(cacheKey: Long): Value = { - val (cachedAt, promise) = futureCache.getIfPresent(cacheKey) - (cachedAt, promise.future) - } - - private def checkAndExpire(cacheKey: Long, - cachedAt: Long, - cacheTTL: Long, - oldFuture: Future[R])(op: => Future[R]): Future[R] = { - if (System.currentTimeMillis() >= cachedAt + cacheTTL) { - // future is too old. so need to expire and fetch new data from storage. - futureCache.asMap().remove(cacheKey) - - val newPromise = Promise[R] - val now = System.currentTimeMillis() - - futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match { - case null => - // only one thread succeed to come here concurrently - // initiate fetch to storage then add callback on complete to finish promise. - op.onSuccess { case value => - newPromise.success(value) - value - } - newPromise.future - case (cachedAt, oldPromise) => oldPromise.future - } - } else { - // future is not to old so reuse it. - oldFuture - } - } - def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Future[R]): Future[R] = { - val cacheVal = futureCache.getIfPresent(cacheKey) - cacheVal match { - case null => - val promise = Promise[R] - val now = System.currentTimeMillis() - val (cachedAt, cachedPromise) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match { - case null => - op.onSuccess { case value => - promise.success(value) - value - } - (now, promise) - case oldVal => oldVal - } - checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op) - - case (cachedAt, cachedPromise) => - checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala deleted file mode 100644 index d281017..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala +++ /dev/null @@ -1,44 +0,0 @@ -package com.kakao.s2graph.core.utils - -import play.api.libs.json.JsValue -import org.slf4j.LoggerFactory - -import scala.language.{higherKinds, implicitConversions} - -object logger { - - trait Loggable[T] { - def toLogMessage(msg: T): String - } - - object Loggable { - implicit val stringLoggable = new Loggable[String] { - def toLogMessage(msg: String) = msg - } - - implicit def numericLoggable[T: Numeric] = new Loggable[T] { - def toLogMessage(msg: T) = msg.toString - } - - implicit val jsonLoggable = new Loggable[JsValue] { - def toLogMessage(msg: JsValue) = msg.toString() - } - - implicit val booleanLoggable = new Loggable[Boolean] { - def toLogMessage(msg: Boolean) = msg.toString() - } - } - - private val logger = LoggerFactory.getLogger("application") - private val errorLogger = LoggerFactory.getLogger("error") - - def info[T: Loggable](msg: => T) = logger.info(implicitly[Loggable[T]].toLogMessage(msg)) - - def debug[T: Loggable](msg: => T) = logger.debug(implicitly[Loggable[T]].toLogMessage(msg)) - - def error[T: Loggable](msg: => T, exception: => Throwable) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception) - - def error[T: Loggable](msg: => T) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg)) -} - - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/utils/SafeUpdateCache.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/com/kakao/s2graph/core/utils/SafeUpdateCache.scala deleted file mode 100644 index d383b47..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/SafeUpdateCache.scala +++ /dev/null @@ -1,62 +0,0 @@ -package com.kakao.s2graph.core.utils - -import java.util.concurrent.atomic.AtomicBoolean - -import com.google.common.cache.CacheBuilder - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - -object SafeUpdateCache { - - case class CacheKey(key: String) - -} - -class SafeUpdateCache[T](prefix: String, maxSize: Int, ttl: Int)(implicit executionContext: ExecutionContext) { - - import SafeUpdateCache._ - - implicit class StringOps(key: String) { - def toCacheKey = new CacheKey(prefix + ":" + key) - } - - def toTs() = (System.currentTimeMillis() / 1000).toInt - - private val cache = CacheBuilder.newBuilder().maximumSize(maxSize).build[CacheKey, (T, Int, AtomicBoolean)]() - - def put(key: String, value: T) = cache.put(key.toCacheKey, (value, toTs, new AtomicBoolean(false))) - - def invalidate(key: String) = cache.invalidate(key.toCacheKey) - - def withCache(key: String)(op: => T): T = { - val cacheKey = key.toCacheKey - val cachedValWithTs = cache.getIfPresent(cacheKey) - - if (cachedValWithTs == null) { - // fetch and update cache. - val newValue = op - cache.put(cacheKey, (newValue, toTs(), new AtomicBoolean(false))) - newValue - } else { - val (cachedVal, updatedAt, isUpdating) = cachedValWithTs - if (toTs() < updatedAt + ttl) cachedVal // in cache TTL - else { - val running = isUpdating.getAndSet(true) - if (running) cachedVal - else { - Future(op)(executionContext) onComplete { - case Failure(ex) => - cache.put(cacheKey, (cachedVal, toTs(), new AtomicBoolean(false))) // keep old value - logger.error(s"withCache update failed: $cacheKey") - case Success(newValue) => - cache.put(cacheKey, (newValue, toTs(), new AtomicBoolean(false))) // update new value - logger.info(s"withCache update success: $cacheKey") - } - cachedVal - } - } - } - } -} -
